/
update_tag_value.go
165 lines (130 loc) · 3.92 KB
/
update_tag_value.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
package rules
import (
"log"
"github.com/Abc-Arbitrage/infix/filter"
"github.com/Abc-Arbitrage/infix/logging"
"github.com/influxdata/influxdb/models"
"github.com/influxdata/influxdb/tsdb/engine/tsm1"
"github.com/Abc-Arbitrage/infix/storage"
)
// UpdateTagValueRule defines a rule to update the value of a tag for a given measurement
type UpdateTagValueRule struct {
measurementFilter filter.Filter
keyFilter filter.Filter
valueFilter filter.Filter
renameFn RenameFn
check bool
logger *log.Logger
}
// UpdateTagValueRuleConfig represents the toml configuration of UpdateTagValue rule
type UpdateTagValueRuleConfig struct {
Measurement filter.Filter
Key filter.Filter
Value filter.Filter
To string
}
// NewUpdateTagValueRule creates a new UpdateTagValueRule
func NewUpdateTagValueRule(measurementFilter filter.Filter, keyFilter filter.Filter, valueFilter filter.Filter, renameFn RenameFn) *UpdateTagValueRule {
return &UpdateTagValueRule{
measurementFilter: measurementFilter,
keyFilter: keyFilter,
valueFilter: valueFilter,
renameFn: renameFn,
check: false,
logger: logging.GetLogger("UpdateTagValueRule"),
}
}
// CheckMode sets the check mode on the rule
func (r *UpdateTagValueRule) CheckMode(check bool) {
r.check = check
}
// Flags implements Rule interface
func (r *UpdateTagValueRule) Flags() int {
return Standard
}
// WithLogger sets the logger on the rule
func (r *UpdateTagValueRule) WithLogger(logger *log.Logger) {
r.logger = logger
}
// FilterKey implements Rule interface
func (r *UpdateTagValueRule) FilterKey(key []byte) bool {
return false
}
// Start implements Rule interface
func (r *UpdateTagValueRule) Start() {
}
// End implements Rule interface
func (r *UpdateTagValueRule) End() {
}
// StartShard implements Rule interface
func (r *UpdateTagValueRule) StartShard(info storage.ShardInfo) bool {
return true
}
// EndShard implements Rule interface
func (r *UpdateTagValueRule) EndShard() error {
return nil
}
// StartTSM implements Rule interface
func (r *UpdateTagValueRule) StartTSM(path string) bool {
return true
}
// EndTSM implements Rule interface
func (r *UpdateTagValueRule) EndTSM() {
}
// StartWAL implements Rule interface
func (r *UpdateTagValueRule) StartWAL(path string) bool {
return true
}
// EndWAL implements Rule interface
func (r *UpdateTagValueRule) EndWAL() {
}
// Apply implements Rule interface
func (r *UpdateTagValueRule) Apply(key []byte, values []tsm1.Value) ([]byte, []tsm1.Value, error) {
if r.measurementFilter.Filter(key) {
seriesKey, field := tsm1.SeriesAndFieldFromCompositeKey(key)
measurement, tags := models.ParseKey(seriesKey)
var newTags models.Tags
for _, tag := range tags {
newTag := tag.Clone()
if r.keyFilter.Filter(tag.Key) && r.valueFilter.Filter(tag.Value) {
newTagValue := r.renameFn(string(tag.Value))
r.logger.Printf("Updating tag for measurement '%s' %s=%s to %s=%s", measurement, tag.Key, tag.Value, tag.Key, newTagValue)
newTag.Value = []byte(newTagValue)
}
newTags = append(newTags, newTag)
}
newKey := models.MakeKey([]byte(measurement), newTags)
newSeriesKey := tsm1.SeriesFieldKeyBytes(string(newKey), string(field))
return newSeriesKey, values, nil
}
return key, values, nil
}
// Sample implements Config interface
func (c *UpdateTagValueRuleConfig) Sample() string {
return `
to="aws-$1"
[measurement.strings]
hasprefix="linux."
[key.strings]
equal="region"
[value.pattern]
pattern="amazon-(.*)"
`
}
// Build implements Config interface
func (c *UpdateTagValueRuleConfig) Build() (Rule, error) {
if c.To == "" {
return nil, ErrMissingRenameTo
}
if c.Measurement == nil {
return nil, ErrMissingMeasurementFilter
}
if c.Key == nil {
return nil, ErrMissingTagKeyFilter
}
if c.Value == nil {
return nil, ErrMissingTagValueFilter
}
renameFn := RenameFnFromFilter(c.Value, c.To)
return NewUpdateTagValueRule(c.Measurement, c.Key, c.Value, renameFn), nil
}