forked from benthosdev/benthos
/
processor_resource.go
88 lines (76 loc) · 2.33 KB
/
processor_resource.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
package pure
import (
"context"
"fmt"
"github.com/benthosdev/benthos/v4/internal/bundle"
"github.com/benthosdev/benthos/v4/internal/component/processor"
"github.com/benthosdev/benthos/v4/internal/docs"
"github.com/benthosdev/benthos/v4/internal/log"
"github.com/benthosdev/benthos/v4/internal/message"
)
func init() {
err := bundle.AllProcessors.Add(func(conf processor.Config, mgr bundle.NewManagement) (processor.V1, error) {
return newResourceProcessor(conf, mgr, mgr.Logger())
}, docs.ComponentSpec{
Name: "resource",
Categories: []string{
"Utility",
},
Summary: `
Resource is a processor type that runs a processor resource identified by its label.`,
Description: `
This processor allows you to reference the same configured processor resource in multiple places, and can also tidy up large nested configs. For example, the config:
` + "```yaml" + `
pipeline:
processors:
- mapping: |
root.message = this
root.meta.link_count = this.links.length()
root.user.age = this.user.age.number()
` + "```" + `
Is equivalent to:
` + "```yaml" + `
pipeline:
processors:
- resource: foo_proc
processor_resources:
- label: foo_proc
mapping: |
root.message = this
root.meta.link_count = this.links.length()
root.user.age = this.user.age.number()
` + "```" + `
You can find out more about resources [in this document.](/docs/configuration/resources)`,
Config: docs.FieldString("", "").HasDefault(""),
})
if err != nil {
panic(err)
}
}
type resourceProcessor struct {
mgr bundle.NewManagement
name string
log log.Modular
}
func newResourceProcessor(conf processor.Config, mgr bundle.NewManagement, log log.Modular) (*resourceProcessor, error) {
if !mgr.ProbeProcessor(conf.Resource) {
return nil, fmt.Errorf("processor resource '%v' was not found", conf.Resource)
}
return &resourceProcessor{
mgr: mgr,
name: conf.Resource,
log: log,
}, nil
}
func (r *resourceProcessor) ProcessBatch(ctx context.Context, msg message.Batch) (msgs []message.Batch, res error) {
if err := r.mgr.AccessProcessor(ctx, r.name, func(p processor.V1) {
msgs, res = p.ProcessBatch(ctx, msg)
}); err != nil {
r.log.Errorf("Failed to obtain processor resource '%v': %v", r.name, err)
return nil, err
}
return msgs, res
}
func (r *resourceProcessor) Close(ctx context.Context) error {
return nil
}