forked from grafana/loki
/
replace.go
221 lines (194 loc) · 6.17 KB
/
replace.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
package stages
import (
"bytes"
"fmt"
"reflect"
"regexp"
"text/template"
"time"
"github.com/go-kit/log"
"github.com/go-kit/log/level"
"github.com/mitchellh/mapstructure"
"github.com/pkg/errors"
"github.com/prometheus/common/model"
)
// Config Errors
const (
ErrEmptyReplaceStageConfig = "empty replace stage configuration"
ErrEmptyReplaceStageSource = "empty source in replace stage"
)
// ReplaceConfig contains a regexStage configuration
type ReplaceConfig struct {
Expression string `mapstructure:"expression"`
Source *string `mapstructure:"source"`
Replace string `mapstructure:"replace"`
}
// validateReplaceConfig validates the config and return a regex
func validateReplaceConfig(c *ReplaceConfig) (*regexp.Regexp, error) {
if c == nil {
return nil, errors.New(ErrEmptyReplaceStageConfig)
}
if c.Expression == "" {
return nil, errors.New(ErrExpressionRequired)
}
if c.Source != nil && *c.Source == "" {
return nil, errors.New(ErrEmptyReplaceStageSource)
}
expr, err := regexp.Compile(c.Expression)
if err != nil {
return nil, errors.Wrap(err, ErrCouldNotCompileRegex)
}
return expr, nil
}
// replaceStage sets extracted data using regular expressions
type replaceStage struct {
cfg *ReplaceConfig
expression *regexp.Regexp
logger log.Logger
}
// newReplaceStage creates a newReplaceStage
func newReplaceStage(logger log.Logger, config interface{}) (Stage, error) {
cfg, err := parseReplaceConfig(config)
if err != nil {
return nil, err
}
expression, err := validateReplaceConfig(cfg)
if err != nil {
return nil, err
}
return toStage(&replaceStage{
cfg: cfg,
expression: expression,
logger: log.With(logger, "component", "stage", "type", "replace"),
}), nil
}
// parseReplaceConfig processes an incoming configuration into a ReplaceConfig
func parseReplaceConfig(config interface{}) (*ReplaceConfig, error) {
cfg := &ReplaceConfig{}
err := mapstructure.Decode(config, cfg)
if err != nil {
return nil, err
}
return cfg, nil
}
// Process implements Stage
func (r *replaceStage) Process(_ model.LabelSet, extracted map[string]interface{}, _ *time.Time, entry *string) {
// If a source key is provided, the replace stage should process it
// from the extracted map, otherwise should fallback to the entry
input := entry
if r.cfg.Source != nil {
if _, ok := extracted[*r.cfg.Source]; !ok {
if Debug {
level.Debug(r.logger).Log("msg", "source does not exist in the set of extracted values", "source", *r.cfg.Source)
}
return
}
value, err := getString(extracted[*r.cfg.Source])
if err != nil {
if Debug {
level.Debug(r.logger).Log("msg", "failed to convert source value to string", "source", *r.cfg.Source, "err", err, "type", reflect.TypeOf(extracted[*r.cfg.Source]))
}
return
}
input = &value
}
if input == nil {
if Debug {
level.Debug(r.logger).Log("msg", "cannot parse a nil entry")
}
return
}
// Get string of matched captured groups. We will use this to extract all named captured groups
match := r.expression.FindStringSubmatch(*input)
matchAllIndex := r.expression.FindAllStringSubmatchIndex(*input, -1)
if matchAllIndex == nil {
if Debug {
level.Debug(r.logger).Log("msg", "regex did not match", "input", *input, "regex", r.expression)
}
return
}
// All extracted values will be available for templating
td := r.getTemplateData(extracted)
// Initialize the template with the "replace" string defined by user
templ, err := template.New("pipeline_template").Funcs(functionMap).Parse(r.cfg.Replace)
if err != nil {
if Debug {
level.Debug(r.logger).Log("msg", "template initialization error", "err", err)
}
return
}
result, capturedMap, err := r.getReplacedEntry(matchAllIndex, *input, td, templ)
if err != nil {
if Debug {
level.Debug(r.logger).Log("msg", "failed to execute template on extracted value", "err", err)
}
return
}
if r.cfg.Source != nil {
extracted[*r.cfg.Source] = result
} else {
*entry = result
}
// All the named captured group will be extracted
for i, name := range r.expression.SubexpNames() {
if i != 0 && name != "" {
if v, ok := capturedMap[match[i]]; ok {
extracted[name] = v
}
}
}
if Debug {
level.Debug(r.logger).Log("msg", "extracted data debug in replace stage", "extracted data", fmt.Sprintf("%v", extracted))
}
}
func (r *replaceStage) getReplacedEntry(matchAllIndex [][]int, input string, td map[string]string, templ *template.Template) (string, map[string]string, error) {
var result string
previousInputEndIndex := 0
capturedMap := make(map[string]string)
// For a simple string like `11.11.11.11 - frank 12.12.12.12 - frank`
// if the regex is "(\\d{2}.\\d{2}.\\d{2}.\\d{2}) - (\\S+)"
// FindAllStringSubmatchIndex would return [[0 19 0 11 14 19] [20 37 20 31 34 37]].
// Each inner array's first two values will be the start and end index of the entire
// matched string and the next values will be start and end index of the matched
// captured group. Here 0-19 is "11.11.11.11 - frank", 0-11 is "11.11.11.11" and
// 14-19 is "frank". So, we advance by 2 index to get the next match
for _, matchIndex := range matchAllIndex {
for i := 2; i < len(matchIndex); i += 2 {
if matchIndex[i] == -1 {
continue
}
capturedString := input[matchIndex[i]:matchIndex[i+1]]
buf := &bytes.Buffer{}
td["Value"] = capturedString
err := templ.Execute(buf, td)
if err != nil {
return "", nil, err
}
st := buf.String()
if previousInputEndIndex == 0 || previousInputEndIndex <= matchIndex[i] {
result += input[previousInputEndIndex:matchIndex[i]] + st
previousInputEndIndex = matchIndex[i+1]
}
capturedMap[capturedString] = st
}
}
return result + input[previousInputEndIndex:], capturedMap, nil
}
func (r *replaceStage) getTemplateData(extracted map[string]interface{}) map[string]string {
td := make(map[string]string)
for k, v := range extracted {
s, err := getString(v)
if err != nil {
if Debug {
level.Debug(r.logger).Log("msg", "extracted template could not be converted to a string", "err", err, "type", reflect.TypeOf(v))
}
continue
}
td[k] = s
}
return td
}
// Name implements Stage
func (r *replaceStage) Name() string {
return StageTypeReplace
}