forked from redpanda-data/connect
-
Notifications
You must be signed in to change notification settings - Fork 0
/
inputs.go
121 lines (104 loc) · 3.66 KB
/
inputs.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
package bundle
import (
"fmt"
"sort"
"github.com/dafanshu/benthos/v3/internal/docs"
"github.com/dafanshu/benthos/v3/lib/input"
"github.com/dafanshu/benthos/v3/lib/types"
)
// AllInputs is a set containing every single input that has been imported.
var AllInputs = &InputSet{
specs: map[string]inputSpec{},
}
//------------------------------------------------------------------------------
// InputAdd adds a new input to this environment by providing a constructor and
// documentation.
func (e *Environment) InputAdd(constructor InputConstructor, spec docs.ComponentSpec) error {
return e.inputs.Add(constructor, spec)
}
// InputInit attempts to initialise an input from a config.
func (e *Environment) InputInit(
hasBatchProc bool,
conf input.Config,
mgr NewManagement,
pipelines ...types.PipelineConstructorFunc,
) (types.Input, error) {
return e.inputs.Init(hasBatchProc, conf, mgr, pipelines...)
}
// InputDocs returns a slice of input specs, which document each method.
func (e *Environment) InputDocs() []docs.ComponentSpec {
return e.inputs.Docs()
}
//------------------------------------------------------------------------------
// InputConstructorFromSimple provides a way to define an input constructor
// without manually initializing processors of the config.
func InputConstructorFromSimple(fn func(input.Config, NewManagement) (input.Type, error)) InputConstructor {
return func(b bool, c input.Config, nm NewManagement, pcf ...types.PipelineConstructorFunc) (input.Type, error) {
i, err := fn(c, nm)
if err != nil {
return nil, fmt.Errorf("failed to create input '%v': %w", c.Type, err)
}
pcf = input.AppendProcessorsFromConfig(c, nm, nm.Logger(), nm.Metrics(), pcf...)
return input.WrapWithPipelines(i, pcf...)
}
}
//------------------------------------------------------------------------------
// InputConstructor constructs an input component.
type InputConstructor func(bool, input.Config, NewManagement, ...types.PipelineConstructorFunc) (input.Type, error)
type inputSpec struct {
constructor InputConstructor
spec docs.ComponentSpec
}
// InputSet contains an explicit set of inputs available to a Benthos service.
type InputSet struct {
specs map[string]inputSpec
}
// Add a new input to this set by providing a constructor and documentation.
func (s *InputSet) Add(constructor InputConstructor, spec docs.ComponentSpec) error {
if s.specs == nil {
s.specs = map[string]inputSpec{}
}
s.specs[spec.Name] = inputSpec{
constructor: constructor,
spec: spec,
}
docs.RegisterDocs(spec)
return nil
}
// Init attempts to initialise an input from a config.
func (s *InputSet) Init(
hasBatchProc bool,
conf input.Config,
mgr NewManagement,
pipelines ...types.PipelineConstructorFunc,
) (types.Input, error) {
spec, exists := s.specs[conf.Type]
if !exists {
// TODO: V4 Remove this
if ctor, exists := input.GetDeprecatedPlugin(conf.Type); exists {
return ctor(hasBatchProc, conf, mgr, mgr.Logger(), mgr.Metrics(), pipelines...)
}
return nil, types.ErrInvalidInputType
}
return spec.constructor(hasBatchProc, conf, mgr, pipelines...)
}
// Docs returns a slice of input specs, which document each method.
func (s *InputSet) Docs() []docs.ComponentSpec {
var docs []docs.ComponentSpec
for _, v := range s.specs {
docs = append(docs, v.spec)
}
sort.Slice(docs, func(i, j int) bool {
return docs[i].Name < docs[j].Name
})
return docs
}
// DocsFor returns the documentation for a given component name, returns a
// boolean indicating whether the component name exists.
func (s *InputSet) DocsFor(name string) (docs.ComponentSpec, bool) {
c, ok := s.specs[name]
if !ok {
return docs.ComponentSpec{}, false
}
return c.spec, true
}