-
Notifications
You must be signed in to change notification settings - Fork 12
/
appender.go
138 lines (107 loc) · 3.46 KB
/
appender.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
package persistence
import (
// You will need to make sure this import exists for side effects:
// _ "github.com/influxdata/influxdb/tsdb/engine"
// the go linter in some instances removes it
"errors"
"github.com/prometheus/prometheus/model/exemplar"
"github.com/prometheus/prometheus/model/histogram"
"github.com/prometheus/prometheus/model/metadata"
"github.com/prometheus/prometheus/storage"
"sync"
"time"
_ "github.com/influxdata/influxdb/tsdb/engine"
"github.com/prometheus/prometheus/model/labels"
"github.com/cloudfoundry/metric-store-release/src/internal/metrics"
"github.com/cloudfoundry/metric-store-release/src/pkg/logger"
"github.com/cloudfoundry/metric-store-release/src/pkg/persistence/transform"
"github.com/cloudfoundry/metric-store-release/src/pkg/rpc"
)
type Adapter interface {
WritePoints(points []*rpc.Point) error
}
type Appender struct {
mu sync.Mutex
points []*rpc.Point
adapter Adapter
labelTruncationLength uint
log *logger.Logger
metrics metrics.Registrar
}
func NewAppender(adapter Adapter, metrics metrics.Registrar, opts ...AppenderOption) *Appender {
appender := &Appender{
adapter: adapter,
metrics: metrics,
log: logger.NewNop(),
labelTruncationLength: 256,
points: []*rpc.Point{},
}
for _, opt := range opts {
opt(appender)
}
return appender
}
type AppenderOption func(*Appender)
func WithLabelTruncationLength(length uint) AppenderOption {
return func(a *Appender) {
a.labelTruncationLength = length
}
}
func WithAppenderLogger(log *logger.Logger) AppenderOption {
return func(a *Appender) {
a.log = log
}
}
func (a *Appender) Append(ref storage.SeriesRef, l labels.Labels, time int64, value float64) (storage.SeriesRef, error) {
a.mu.Lock()
defer a.mu.Unlock()
if !transform.IsValidFloat(value) {
return 0, errors.New("NaN float cannot be added")
}
point := &rpc.Point{
Name: l.Get(labels.MetricName),
Timestamp: time,
Value: value,
Labels: a.cleanLabels(l),
}
a.points = append(a.points, point)
return 0, nil
}
func (a *Appender) AppendExemplar(ref storage.SeriesRef, l labels.Labels, e exemplar.Exemplar) (storage.SeriesRef, error) {
// no longer useful in our implementation, use Append instead
return 0, nil
}
func (a *Appender) AppendHistogram(ref storage.SeriesRef, l labels.Labels, t int64, h *histogram.Histogram, fh *histogram.FloatHistogram) (storage.SeriesRef, error) {
// not useful in our implementation
return 0, nil
}
func (a *Appender) UpdateMetadata(ref storage.SeriesRef, l labels.Labels, m metadata.Metadata) (storage.SeriesRef, error) {
// not useful in our implementation
return 0, nil
}
func (a *Appender) Commit() error {
a.mu.Lock()
defer a.mu.Unlock()
start := time.Now()
err := a.adapter.WritePoints(a.points)
if err == nil {
duration := transform.DurationToSeconds(time.Since(start))
a.metrics.Histogram(metrics.MetricStoreWriteDurationSeconds).Observe(duration)
a.metrics.Add(metrics.MetricStoreWrittenPointsTotal, float64(len(a.points)))
}
a.points = a.points[:0]
return err
}
func (a *Appender) Rollback() error {
panic("not implemented")
}
func (a *Appender) cleanLabels(l labels.Labels) map[string]string {
newLabels := l.Map()
for name, value := range newLabels {
if uint(len(value)) > a.labelTruncationLength {
newLabels[name] = value[:a.labelTruncationLength]
}
}
delete(newLabels, labels.MetricName)
return newLabels
}