/
cached_rawstore.go
146 lines (124 loc) · 4.42 KB
/
cached_rawstore.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
package storage
import (
"bytes"
"context"
"io"
"runtime/debug"
"time"
"github.com/coocood/freecache"
"github.com/prometheus/client_golang/prometheus"
"github.com/flyteorg/flyte/flytestdlib/errors"
"github.com/flyteorg/flyte/flytestdlib/ioutils"
"github.com/flyteorg/flyte/flytestdlib/logger"
"github.com/flyteorg/flyte/flytestdlib/otelutils"
"github.com/flyteorg/flyte/flytestdlib/promutils"
)
const neverExpire = 0
// TODO Freecache has bunch of metrics it calculates. Lets write a prom collector to publish these metrics
type cacheMetrics struct {
CacheHit prometheus.Counter
CacheMiss prometheus.Counter
CacheWriteError prometheus.Counter
FetchLatency promutils.StopWatch
}
type cachedRawStore struct {
RawStore
cache *freecache.Cache
metrics *cacheMetrics
}
// Head gets metadata about the reference. This should generally be a lightweight operation.
func (s *cachedRawStore) Head(ctx context.Context, reference DataReference) (Metadata, error) {
ctx, span := otelutils.NewSpan(ctx, otelutils.BlobstoreClientTracer, "flytestdlib.storage.cachedRawStore/Head")
defer span.End()
key := []byte(reference)
if oRaw, err := s.cache.Get(key); err == nil {
s.metrics.CacheHit.Inc()
// Found, Cache hit
size := int64(len(oRaw))
// return size in metadata
return StowMetadata{exists: true, size: size}, nil
}
s.metrics.CacheMiss.Inc()
return s.RawStore.Head(ctx, reference)
}
// ReadRaw retrieves a byte array from the Blob store or an error
func (s *cachedRawStore) ReadRaw(ctx context.Context, reference DataReference) (io.ReadCloser, error) {
ctx, span := otelutils.NewSpan(ctx, otelutils.BlobstoreClientTracer, "flytestdlib.storage.cachedRawStore/ReadRaw")
defer span.End()
key := []byte(reference)
if oRaw, err := s.cache.Get(key); err == nil {
// Found, Cache hit
s.metrics.CacheHit.Inc()
return ioutils.NewBytesReadCloser(oRaw), nil
}
s.metrics.CacheMiss.Inc()
reader, err := s.RawStore.ReadRaw(ctx, reference)
if err != nil {
return nil, err
}
defer func() {
err = reader.Close()
if err != nil {
logger.Warnf(ctx, "Failed to close reader [%v]. Error: %v", reference, err)
}
}()
b, err := ioutils.ReadAll(reader, s.metrics.FetchLatency.Start())
if err != nil {
return nil, err
}
err = s.cache.Set(key, b, 0)
if err != nil {
logger.Debugf(ctx, "Failed to Cache the metadata")
err = errors.Wrapf(ErrFailedToWriteCache, err, "Failed to Cache the metadata")
}
return ioutils.NewBytesReadCloser(b), err
}
// WriteRaw stores a raw byte array.
func (s *cachedRawStore) WriteRaw(ctx context.Context, reference DataReference, size int64, opts Options, raw io.Reader) error {
ctx, span := otelutils.NewSpan(ctx, otelutils.BlobstoreClientTracer, "flytestdlib.storage.cachedRawStore/WriteRaw")
defer span.End()
var buf bytes.Buffer
teeReader := io.TeeReader(raw, &buf)
err := s.RawStore.WriteRaw(ctx, reference, size, opts, teeReader)
if err != nil {
return err
}
err = s.cache.Set([]byte(reference), buf.Bytes(), neverExpire)
if err != nil {
s.metrics.CacheWriteError.Inc()
err = errors.Wrapf(ErrFailedToWriteCache, err, "Failed to Cache the metadata")
}
return err
}
// Delete removes the referenced data from the cache as well as underlying store.
func (s *cachedRawStore) Delete(ctx context.Context, reference DataReference) error {
key := []byte(reference)
if deleted := s.cache.Del(key); deleted {
s.metrics.CacheHit.Inc()
} else {
s.metrics.CacheMiss.Inc()
}
return s.RawStore.Delete(ctx, reference)
}
func newCacheMetrics(scope promutils.Scope) *cacheMetrics {
return &cacheMetrics{
FetchLatency: scope.MustNewStopWatch("remote_fetch", "Total Time to read from remote metastore", time.Millisecond),
CacheHit: scope.MustNewCounter("cache_hit", "Number of times metadata was found in cache"),
CacheMiss: scope.MustNewCounter("cache_miss", "Number of times metadata was not found in cache and remote fetch was required"),
CacheWriteError: scope.MustNewCounter("cache_write_err", "Failed to write to cache"),
}
}
// Creates a CachedStore if Caching is enabled, otherwise returns a RawStore
func newCachedRawStore(cfg *Config, store RawStore, metrics *cacheMetrics) RawStore {
if cfg.Cache.MaxSizeMegabytes > 0 {
if cfg.Cache.TargetGCPercent > 0 {
debug.SetGCPercent(cfg.Cache.TargetGCPercent)
}
return &cachedRawStore{
RawStore: store,
cache: freecache.NewCache(cfg.Cache.MaxSizeMegabytes * 1024 * 1024),
metrics: metrics,
}
}
return store
}