/
storage_put.go
111 lines (98 loc) · 2.68 KB
/
storage_put.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
package storage
import (
"context"
"fmt"
"math/big"
"time"
"github.com/sirupsen/logrus"
"github.com/pyroscope-io/pyroscope/pkg/storage/dimension"
"github.com/pyroscope-io/pyroscope/pkg/storage/metadata"
"github.com/pyroscope-io/pyroscope/pkg/storage/segment"
"github.com/pyroscope-io/pyroscope/pkg/storage/tree"
)
type PutInput struct {
StartTime time.Time
EndTime time.Time
Key *segment.Key
Val *tree.Tree
SpyName string
SampleRate uint32
Units metadata.Units
AggregationType metadata.AggregationType
}
func (s *Storage) Put(ctx context.Context, pi *PutInput) error {
if s.hc.IsOutOfDiskSpace() {
return errOutOfSpace
}
if pi.StartTime.Before(s.retentionPolicy().LowerTimeBoundary()) {
return errRetention
}
s.putTotal.Inc()
if pi.Key.HasProfileID() {
if err := s.ensureAppSegmentExists(pi); err != nil {
return err
}
return s.exemplars.insert(ctx, pi)
}
s.logger.WithFields(logrus.Fields{
"startTime": pi.StartTime.String(),
"endTime": pi.EndTime.String(),
"key": pi.Key.Normalized(),
"samples": pi.Val.Samples(),
"units": pi.Units,
"aggregationType": pi.AggregationType,
}).Debug("storage.Put")
if err := s.labels.PutLabels(pi.Key.Labels()); err != nil {
return fmt.Errorf("unable to write labels: %w", err)
}
sk := pi.Key.SegmentKey()
for k, v := range pi.Key.Labels() {
key := k + ":" + v
r, err := s.dimensions.GetOrCreate(key)
if err != nil {
s.logger.Errorf("dimensions cache for %v: %v", key, err)
continue
}
r.(*dimension.Dimension).Insert([]byte(sk))
s.dimensions.Put(key, r)
}
r, err := s.segments.GetOrCreate(sk)
if err != nil {
return fmt.Errorf("segments cache for %v: %v", sk, err)
}
st := r.(*segment.Segment)
st.SetMetadata(metadata.Metadata{
SpyName: pi.SpyName,
SampleRate: pi.SampleRate,
Units: pi.Units,
AggregationType: pi.AggregationType,
})
samples := pi.Val.Samples()
err = st.Put(pi.StartTime, pi.EndTime, samples, func(depth int, t time.Time, r *big.Rat, addons []segment.Addon) {
tk := pi.Key.TreeKey(depth, t)
res, err := s.trees.GetOrCreate(tk)
if err != nil {
s.logger.Errorf("trees cache for %v: %v", tk, err)
return
}
cachedTree := res.(*tree.Tree)
treeClone := pi.Val.Clone(r)
for _, addon := range addons {
if res, ok := s.trees.Lookup(pi.Key.TreeKey(addon.Depth, addon.T)); ok {
ta := res.(*tree.Tree)
ta.RLock()
treeClone.Merge(ta)
ta.RUnlock()
}
}
cachedTree.Lock()
cachedTree.Merge(treeClone)
cachedTree.Unlock()
s.trees.Put(tk, cachedTree)
})
if err != nil {
return err
}
s.segments.Put(sk, st)
return nil
}