-
Notifications
You must be signed in to change notification settings - Fork 92
/
appender.go
123 lines (103 loc) · 2.92 KB
/
appender.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
package pyroscope
import (
"context"
"sync"
"time"
"github.com/hashicorp/go-multierror"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/prometheus/model/labels"
)
const (
LabelNameDelta = "__delta__"
)
var NoopAppendable = AppendableFunc(func(_ context.Context, _ labels.Labels, _ []*RawSample) error { return nil })
type Appendable interface {
Appender() Appender
}
type Appender interface {
Append(ctx context.Context, labels labels.Labels, samples []*RawSample) error
}
type RawSample struct {
// raw_profile is the set of bytes of the pprof profile
RawProfile []byte
}
var _ Appendable = (*Fanout)(nil)
// Fanout supports the default Alloy style of appendables since it can go to multiple outputs. It also allows the intercepting of appends.
type Fanout struct {
mut sync.RWMutex
// children is where to fan out.
children []Appendable
// ComponentID is what component this belongs to.
componentID string
writeLatency prometheus.Histogram
}
// NewFanout creates a fanout appendable.
func NewFanout(children []Appendable, componentID string, register prometheus.Registerer) *Fanout {
wl := prometheus.NewHistogram(prometheus.HistogramOpts{
Name: "pyroscope_fanout_latency",
Help: "Write latency for sending to pyroscope profiles",
})
_ = register.Register(wl)
return &Fanout{
children: children,
componentID: componentID,
writeLatency: wl,
}
}
// UpdateChildren allows changing of the children of the fanout.
func (f *Fanout) UpdateChildren(children []Appendable) {
f.mut.Lock()
defer f.mut.Unlock()
f.children = children
}
// Children returns the children of the fanout.
func (f *Fanout) Children() []Appendable {
f.mut.Lock()
defer f.mut.Unlock()
return f.children
}
// Appender satisfies the Appendable interface.
func (f *Fanout) Appender() Appender {
f.mut.RLock()
defer f.mut.RUnlock()
app := &appender{
children: make([]Appender, 0),
componentID: f.componentID,
writeLatency: f.writeLatency,
}
for _, x := range f.children {
if x == nil {
continue
}
app.children = append(app.children, x.Appender())
}
return app
}
var _ Appender = (*appender)(nil)
type appender struct {
children []Appender
componentID string
writeLatency prometheus.Histogram
}
// Append satisfies the Appender interface.
func (a *appender) Append(ctx context.Context, labels labels.Labels, samples []*RawSample) error {
now := time.Now()
defer func() {
a.writeLatency.Observe(time.Since(now).Seconds())
}()
var multiErr error
for _, x := range a.children {
err := x.Append(ctx, labels, samples)
if err != nil {
multiErr = multierror.Append(multiErr, err)
}
}
return multiErr
}
type AppendableFunc func(ctx context.Context, labels labels.Labels, samples []*RawSample) error
func (f AppendableFunc) Append(ctx context.Context, labels labels.Labels, samples []*RawSample) error {
return f(ctx, labels, samples)
}
func (f AppendableFunc) Appender() Appender {
return f
}