-
Notifications
You must be signed in to change notification settings - Fork 3.3k
/
match.go
100 lines (87 loc) · 2.9 KB
/
match.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
package stages
import (
"time"
"github.com/go-kit/kit/log"
"github.com/mitchellh/mapstructure"
"github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/common/model"
"github.com/prometheus/prometheus/pkg/labels"
"github.com/grafana/loki/pkg/logql"
)
const (
ErrEmptyMatchStageConfig = "match stage config cannot be empty"
ErrPipelineNameRequired = "match stage pipeline name can be omitted but cannot be an empty string"
ErrSelectorRequired = "selector statement required for match stage"
ErrMatchRequiresStages = "match stage requires at least one additional stage to be defined in '- stages'"
ErrSelectorSyntax = "invalid selector syntax for match stage"
)
// MatcherConfig contains the configuration for a matcherStage
type MatcherConfig struct {
PipelineName *string `mapstructure:"pipeline_name"`
Selector string `mapstructure:"selector"`
Stages PipelineStages `mapstructure:"stages"`
}
// validateMatcherConfig validates the MatcherConfig for the matcherStage
func validateMatcherConfig(cfg *MatcherConfig) ([]*labels.Matcher, error) {
if cfg == nil {
return nil, errors.New(ErrEmptyMatchStageConfig)
}
if cfg.PipelineName != nil && *cfg.PipelineName == "" {
return nil, errors.New(ErrPipelineNameRequired)
}
if cfg.Selector == "" {
return nil, errors.New(ErrSelectorRequired)
}
if cfg.Stages == nil || len(cfg.Stages) == 0 {
return nil, errors.New(ErrMatchRequiresStages)
}
matchers, err := logql.ParseMatchers(cfg.Selector)
if err != nil {
return nil, errors.Wrap(err, ErrSelectorSyntax)
}
return matchers, nil
}
// newMatcherStage creates a new matcherStage from config
func newMatcherStage(logger log.Logger, jobName *string, config interface{}, registerer prometheus.Registerer) (Stage, error) {
cfg := &MatcherConfig{}
err := mapstructure.Decode(config, cfg)
if err != nil {
return nil, err
}
matchers, err := validateMatcherConfig(cfg)
if err != nil {
return nil, err
}
var nPtr *string
if cfg.PipelineName != nil && jobName != nil {
name := *jobName + "_" + *cfg.PipelineName
nPtr = &name
}
pl, err := NewPipeline(logger, cfg.Stages, nPtr, registerer)
if err != nil {
return nil, errors.Wrapf(err, "match stage failed to create pipeline from config: %v", config)
}
return &matcherStage{
matchers: matchers,
pipeline: pl,
}, nil
}
// matcherStage applies Label matchers to determine if the include stages should be run
type matcherStage struct {
matchers []*labels.Matcher
pipeline Stage
}
// Process implements Stage
func (m *matcherStage) Process(labels model.LabelSet, extracted map[string]interface{}, t *time.Time, entry *string) {
for _, filter := range m.matchers {
if !filter.Matches(string(labels[model.LabelName(filter.Name)])) {
return
}
}
m.pipeline.Process(labels, extracted, t, entry)
}
// Name implements Stage
func (m *matcherStage) Name() string {
return StageTypeMatch
}