-
Notifications
You must be signed in to change notification settings - Fork 485
/
pipeline.go
194 lines (177 loc) · 5.94 KB
/
pipeline.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
package stages
// This package is ported over from grafana/loki/clients/pkg/logentry/stages.
// We aim to port the stages in steps, to avoid introducing huge amounts of
// new code without being able to slowly review, examine and test them.
import (
"context"
"fmt"
"sync"
"github.com/go-kit/log"
"github.com/grafana/agent/component/common/loki"
"github.com/prometheus/client_golang/prometheus"
"golang.org/x/time/rate"
)
// StageConfig defines a single stage in a processing pipeline.
// We define these as pointers types so we can use reflection to check that
// exactly one is set.
type StageConfig struct {
JSONConfig *JSONConfig `river:"json,block,optional"`
LogfmtConfig *LogfmtConfig `river:"logfmt,block,optional"`
LabelsConfig *LabelsConfig `river:"labels,block,optional"`
StructuredMetadata *LabelsConfig `river:"structured_metadata,block,optional"`
LabelAllowConfig *LabelAllowConfig `river:"label_keep,block,optional"`
LabelDropConfig *LabelDropConfig `river:"label_drop,block,optional"`
StaticLabelsConfig *StaticLabelsConfig `river:"static_labels,block,optional"`
DockerConfig *DockerConfig `river:"docker,block,optional"`
CRIConfig *CRIConfig `river:"cri,block,optional"`
RegexConfig *RegexConfig `river:"regex,block,optional"`
TimestampConfig *TimestampConfig `river:"timestamp,block,optional"`
OutputConfig *OutputConfig `river:"output,block,optional"`
ReplaceConfig *ReplaceConfig `river:"replace,block,optional"`
MultilineConfig *MultilineConfig `river:"multiline,block,optional"`
MatchConfig *MatchConfig `river:"match,block,optional"`
DropConfig *DropConfig `river:"drop,block,optional"`
PackConfig *PackConfig `river:"pack,block,optional"`
TemplateConfig *TemplateConfig `river:"template,block,optional"`
TenantConfig *TenantConfig `river:"tenant,block,optional"`
LimitConfig *LimitConfig `river:"limit,block,optional"`
MetricsConfig *MetricsConfig `river:"metrics,block,optional"`
GeoIPConfig *GeoIPConfig `river:"geoip,block,optional"`
}
var rateLimiter *rate.Limiter
var rateLimiterDrop bool
var rateLimiterDropReason = "global_rate_limiter_drop"
// Pipeline pass down a log entry to each stage for mutation and/or label extraction.
type Pipeline struct {
logger log.Logger
stages []Stage
jobName *string
dropCount *prometheus.CounterVec
}
// NewPipeline creates a new log entry pipeline from a configuration
func NewPipeline(logger log.Logger, stages []StageConfig, jobName *string, registerer prometheus.Registerer) (*Pipeline, error) {
st := []Stage{}
for _, stage := range stages {
newStage, err := New(logger, jobName, stage, registerer)
if err != nil {
return nil, fmt.Errorf("invalid stage config %w", err)
}
st = append(st, newStage)
}
return &Pipeline{
logger: log.With(logger, "component", "pipeline"),
stages: st,
jobName: jobName,
dropCount: getDropCountMetric(registerer),
}, nil
}
// RunWith will reads from the input channel entries, mutate them with the process function and returns them via the output channel.
func RunWith(input chan Entry, process func(e Entry) Entry) chan Entry {
out := make(chan Entry)
go func() {
defer close(out)
for e := range input {
out <- process(e)
}
}()
return out
}
// RunWithSkip same as RunWith, except it skip sending it to output channel, if `process` functions returns `skip` true.
func RunWithSkip(input chan Entry, process func(e Entry) (Entry, bool)) chan Entry {
out := make(chan Entry)
go func() {
defer close(out)
for e := range input {
ee, skip := process(e)
if skip {
continue
}
out <- ee
}
}()
return out
}
// RunWithSkiporSendMany same as RunWithSkip, except it can either skip sending it to output channel, if `process` functions returns `skip` true. Or send many entries.
func RunWithSkipOrSendMany(input chan Entry, process func(e Entry) ([]Entry, bool)) chan Entry {
out := make(chan Entry)
go func() {
defer close(out)
for e := range input {
results, skip := process(e)
if skip {
continue
}
for _, result := range results {
out <- result
}
}
}()
return out
}
// Run implements Stage
func (p *Pipeline) Run(in chan Entry) chan Entry {
in = RunWith(in, func(e Entry) Entry {
// Initialize the extracted map with the initial labels (ie. "filename"),
// so that stages can operate on initial labels too
for labelName, labelValue := range e.Labels {
e.Extracted[string(labelName)] = string(labelValue)
}
return e
})
// chain all stages together.
for _, m := range p.stages {
in = m.Run(in)
}
return in
}
// Name implements Stage
func (p *Pipeline) Name() string {
return StageTypePipeline
}
// Wrap implements EntryMiddleware
func (p *Pipeline) Wrap(next loki.EntryHandler) loki.EntryHandler {
handlerIn := make(chan loki.Entry)
nextChan := next.Chan()
wg, once := sync.WaitGroup{}, sync.Once{}
pipelineIn := make(chan Entry)
pipelineOut := p.Run(pipelineIn)
wg.Add(2)
go func() {
defer wg.Done()
for e := range pipelineOut {
if rateLimiter != nil {
if rateLimiterDrop {
if !rateLimiter.Allow() {
p.dropCount.WithLabelValues(rateLimiterDropReason).Inc()
continue
}
} else {
_ = rateLimiter.Wait(context.Background())
}
}
nextChan <- e.Entry
}
}()
go func() {
defer wg.Done()
defer close(pipelineIn)
for e := range handlerIn {
pipelineIn <- Entry{
Extracted: map[string]interface{}{},
Entry: e,
}
}
}()
return loki.NewEntryHandler(handlerIn, func() {
once.Do(func() { close(handlerIn) })
wg.Wait()
})
}
// Size gets the current number of stages in the pipeline
func (p *Pipeline) Size() int {
return len(p.stages)
}
func SetReadLineRateLimiter(rateVal float64, burstVal int, drop bool) {
rateLimiter = rate.NewLimiter(rate.Limit(rateVal), burstVal)
rateLimiterDrop = drop
}