-
Notifications
You must be signed in to change notification settings - Fork 804
/
constructor.go
96 lines (86 loc) · 3.09 KB
/
constructor.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
package pipeline
import (
"fmt"
"github.com/Jeffail/benthos/v3/internal/interop"
"github.com/Jeffail/benthos/v3/lib/log"
"github.com/Jeffail/benthos/v3/lib/metrics"
"github.com/Jeffail/benthos/v3/lib/processor"
"github.com/Jeffail/benthos/v3/lib/types"
)
//------------------------------------------------------------------------------
// Config is a configuration struct for creating parallel processing pipelines.
// The number of resuling parallel processing pipelines will match the number of
// threads specified. Processors are executed on each message in the order that
// they are written.
//
// In order to fully utilise each processing thread you must either have a
// number of parallel inputs that matches or surpasses the number of pipeline
// threads, or use a memory buffer.
type Config struct {
Threads int `json:"threads" yaml:"threads"`
Processors []processor.Config `json:"processors" yaml:"processors"`
}
// NewConfig returns a configuration struct fully populated with default values.
func NewConfig() Config {
return Config{
Threads: 1,
Processors: []processor.Config{},
}
}
// SanitiseConfig returns a sanitised version of the Config, meaning sections
// that aren't relevant to behaviour are removed.
func SanitiseConfig(conf Config) (interface{}, error) {
return conf.Sanitised(false)
}
// Sanitised returns a sanitised version of the config, meaning sections that
// aren't relevant to behaviour are removed. Also optionally removes deprecated
// fields.
func (conf Config) Sanitised(removeDeprecated bool) (interface{}, error) {
procConfs := make([]interface{}, len(conf.Processors))
for i, pConf := range conf.Processors {
var err error
if procConfs[i], err = pConf.Sanitised(removeDeprecated); err != nil {
return nil, err
}
}
return map[string]interface{}{
"threads": conf.Threads,
"processors": procConfs,
}, nil
}
//------------------------------------------------------------------------------
// New creates an input type based on an input configuration.
func New(
conf Config,
mgr types.Manager,
log log.Modular,
stats metrics.Type,
processorCtors ...types.ProcessorConstructorFunc,
) (Type, error) {
procs := 0
procCtor := func(i *int) (types.Pipeline, error) {
processors := make([]types.Processor, len(conf.Processors)+len(processorCtors))
for j, procConf := range conf.Processors {
pMgr, pLog, pMetrics := interop.LabelChild(fmt.Sprintf("processor.%v", *i), mgr, log, stats)
var err error
processors[j], err = processor.New(procConf, pMgr, pLog, pMetrics)
if err != nil {
return nil, fmt.Errorf("failed to create processor '%v': %v", procConf.Type, err)
}
*i++
}
for j, procCtor := range processorCtors {
var err error
processors[j+len(conf.Processors)], err = procCtor()
if err != nil {
return nil, fmt.Errorf("failed to create processor: %v", err)
}
}
return NewProcessor(log, stats, processors...), nil
}
if conf.Threads == 1 {
return procCtor(&procs)
}
return NewPool(procCtor, conf.Threads, log, stats)
}
//------------------------------------------------------------------------------