/
pipeline.go
124 lines (112 loc) · 3.17 KB
/
pipeline.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
package stages
import (
"sync"
"github.com/go-kit/kit/log"
"github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus"
"github.com/credativ/vali/pkg/valitail/api"
)
// PipelineStages contains configuration for each stage within a pipeline
type PipelineStages = []interface{}
// PipelineStage contains configuration for a single pipeline stage
type PipelineStage = map[interface{}]interface{}
// Pipeline pass down a log entry to each stage for mutation and/or label extraction.
type Pipeline struct {
logger log.Logger
stages []Stage
jobName *string
}
// NewPipeline creates a new log entry pipeline from a configuration
func NewPipeline(logger log.Logger, stgs PipelineStages, jobName *string, registerer prometheus.Registerer) (*Pipeline, error) {
st := []Stage{}
for _, s := range stgs {
stage, ok := s.(PipelineStage)
if !ok {
return nil, errors.Errorf("invalid YAML config, "+
"make sure each stage of your pipeline is a YAML object (must end with a `:`), check stage `- %s`", s)
}
if len(stage) > 1 {
return nil, errors.New("pipeline stage must contain only one key")
}
for key, config := range stage {
name, ok := key.(string)
if !ok {
return nil, errors.New("pipeline stage key must be a string")
}
newStage, err := New(logger, jobName, name, config, registerer)
if err != nil {
return nil, errors.Wrapf(err, "invalid %s stage config", name)
}
st = append(st, newStage)
}
}
return &Pipeline{
logger: log.With(logger, "component", "pipeline"),
stages: st,
jobName: jobName,
}, nil
}
// RunWith will reads from the input channel entries, mutate them with the process function and returns them via the output channel.
func RunWith(input chan Entry, process func(e Entry) Entry) chan Entry {
out := make(chan Entry)
go func() {
defer close(out)
for e := range input {
out <- process(e)
}
}()
return out
}
// Run implements Stage
func (p *Pipeline) Run(in chan Entry) chan Entry {
in = RunWith(in, func(e Entry) Entry {
// Initialize the extracted map with the initial labels (ie. "filename"),
// so that stages can operate on initial labels too
for labelName, labelValue := range e.Labels {
e.Extracted[string(labelName)] = string(labelValue)
}
return e
})
// chain all stages together.
for _, m := range p.stages {
in = m.Run(in)
}
return in
}
// Name implements Stage
func (p *Pipeline) Name() string {
return StageTypePipeline
}
// Wrap implements EntryMiddleware
func (p *Pipeline) Wrap(next api.EntryHandler) api.EntryHandler {
handlerIn := make(chan api.Entry)
nextChan := next.Chan()
wg, once := sync.WaitGroup{}, sync.Once{}
pipelineIn := make(chan Entry)
pipelineOut := p.Run(pipelineIn)
wg.Add(2)
go func() {
defer wg.Done()
for e := range pipelineOut {
nextChan <- e.Entry
}
}()
go func() {
defer wg.Done()
defer close(pipelineIn)
for e := range handlerIn {
pipelineIn <- Entry{
Extracted: map[string]interface{}{},
Entry: e,
}
}
}()
return api.NewEntryHandler(handlerIn, func() {
once.Do(func() { close(handlerIn) })
wg.Wait()
})
}
// Size gets the current number of stages in the pipeline
func (p *Pipeline) Size() int {
return len(p.stages)
}