forked from benthosdev/benthos
/
config_input.go
96 lines (81 loc) · 2.61 KB
/
config_input.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
package service
import (
"fmt"
"strconv"
"strings"
"gopkg.in/yaml.v3"
"github.com/dafanshu/benthos/v4/internal/component/input"
"github.com/dafanshu/benthos/v4/internal/docs"
)
// NewInputField defines a new input field, it is then possible to extract an
// OwnedInput from the resulting parsed config with the method FieldInput.
func NewInputField(name string) *ConfigField {
return &ConfigField{
field: docs.FieldInput(name, ""),
}
}
// FieldInput accesses a field from a parsed config that was defined with
// NewInputField and returns an OwnedInput, or an error if the configuration was
// invalid.
func (p *ParsedConfig) FieldInput(path ...string) (*OwnedInput, error) {
field, exists := p.field(path...)
if !exists {
return nil, fmt.Errorf("field '%v' was not found in the config", strings.Join(path, "."))
}
pNode, ok := field.(*yaml.Node)
if !ok {
return nil, fmt.Errorf("unexpected value, expected object, got %T", field)
}
var conf input.Config
if err := pNode.Decode(&conf); err != nil {
return nil, err
}
iproc, err := p.mgr.IntoPath(path...).NewInput(conf)
if err != nil {
return nil, err
}
return &OwnedInput{iproc}, nil
}
// NewInputListField defines a new input list field, it is then possible
// to extract a list of OwnedInput from the resulting parsed config with the
// method FieldInputList.
func NewInputListField(name string) *ConfigField {
return &ConfigField{
field: docs.FieldInput(name, "").Array(),
}
}
// FieldInputList accesses a field from a parsed config that was defined
// with NewInputListField and returns a slice of OwnedInput, or an error
// if the configuration was invalid.
func (p *ParsedConfig) FieldInputList(path ...string) ([]*OwnedInput, error) {
field, exists := p.field(path...)
if !exists {
return nil, fmt.Errorf("field '%v' was not found in the config", strings.Join(path, "."))
}
fieldArray, ok := field.([]any)
if !ok {
return nil, fmt.Errorf("unexpected value, expected array, got %T", field)
}
var configs []input.Config
for i, iConf := range fieldArray {
node, ok := iConf.(*yaml.Node)
if !ok {
return nil, fmt.Errorf("value %v returned unexpected value, expected object, got %T", i, iConf)
}
var conf input.Config
if err := node.Decode(&conf); err != nil {
return nil, fmt.Errorf("value %v: %w", i, err)
}
configs = append(configs, conf)
}
tmpMgr := p.mgr.IntoPath(path...)
ins := make([]*OwnedInput, len(configs))
for i, c := range configs {
iproc, err := tmpMgr.IntoPath(strconv.Itoa(i)).NewInput(c)
if err != nil {
return nil, fmt.Errorf("input %v: %w", i, err)
}
ins[i] = &OwnedInput{iproc}
}
return ins, nil
}