-
Notifications
You must be signed in to change notification settings - Fork 785
/
bundle.go
73 lines (64 loc) · 2.14 KB
/
bundle.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
package tracing
import (
"github.com/benthosdev/benthos/v4/internal/bloblang/query"
"github.com/benthosdev/benthos/v4/internal/bundle"
"github.com/benthosdev/benthos/v4/internal/component/input"
"github.com/benthosdev/benthos/v4/internal/component/output"
"github.com/benthosdev/benthos/v4/internal/component/output/processors"
"github.com/benthosdev/benthos/v4/internal/component/processor"
)
// TracedBundle modifies a provided bundle environment so that traceable
// components are wrapped by components that add trace events to the returned
// summary.
func TracedBundle(b *bundle.Environment) (*bundle.Environment, *Summary) {
summary := NewSummary()
tracedEnv := b.Clone()
for _, spec := range b.InputDocs() {
_ = tracedEnv.InputAdd(func(conf input.Config, nm bundle.NewManagement) (input.Streamed, error) {
i, err := b.InputInit(conf, nm)
if err != nil {
return nil, err
}
key := nm.Label()
if key == "" {
key = "root." + query.SliceToDotPath(nm.Path()...)
}
iEvents, ctr := summary.wInputEvents(key)
i = traceInput(iEvents, ctr, i)
return i, err
}, spec)
}
for _, spec := range b.ProcessorDocs() {
_ = tracedEnv.ProcessorAdd(func(conf processor.Config, nm bundle.NewManagement) (processor.V1, error) {
i, err := b.ProcessorInit(conf, nm)
if err != nil {
return nil, err
}
key := nm.Label()
if key == "" {
key = "root." + query.SliceToDotPath(nm.Path()...)
}
pEvents, errCtr := summary.wProcessorEvents(key)
i = traceProcessor(pEvents, errCtr, i)
return i, err
}, spec)
}
for _, spec := range b.OutputDocs() {
_ = tracedEnv.OutputAdd(func(conf output.Config, nm bundle.NewManagement, pcf ...processor.PipelineConstructorFunc) (output.Streamed, error) {
pcf = processors.AppendFromConfig(conf, nm, pcf...)
conf.Processors = nil
o, err := b.OutputInit(conf, nm)
if err != nil {
return nil, err
}
key := nm.Label()
if key == "" {
key = "root." + query.SliceToDotPath(nm.Path()...)
}
oEvents, ctr := summary.wOutputEvents(key)
o = traceOutput(oEvents, ctr, o)
return output.WrapWithPipelines(o, pcf...)
}, spec)
}
return tracedEnv, summary
}