-
Notifications
You must be signed in to change notification settings - Fork 793
/
constructor.go
251 lines (207 loc) · 7.5 KB
/
constructor.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
package buffer
import (
"bytes"
"encoding/json"
"fmt"
"sort"
"strings"
"github.com/Jeffail/benthos/v3/internal/docs"
"github.com/Jeffail/benthos/v3/lib/log"
"github.com/Jeffail/benthos/v3/lib/metrics"
"github.com/Jeffail/benthos/v3/lib/types"
"github.com/Jeffail/benthos/v3/lib/util/config"
yaml "gopkg.in/yaml.v3"
)
//------------------------------------------------------------------------------
// TypeSpec is a constructor and usage description for each buffer type.
type TypeSpec struct {
constructor func(conf Config, mgr types.Manager, log log.Modular, stats metrics.Type) (Type, error)
sanitiseConfigFunc func(conf Config) (interface{}, error)
Summary string
Description string
Footnotes string
FieldSpecs docs.FieldSpecs
Beta bool
Deprecated bool
}
// Constructors is a map of all buffer types with their specs.
var Constructors = map[string]TypeSpec{}
//------------------------------------------------------------------------------
// String constants representing each buffer type.
const (
TypeMemory = "memory"
TypeNone = "none"
)
//------------------------------------------------------------------------------
// Config is the all encompassing configuration struct for all buffer types.
type Config struct {
Type string `json:"type" yaml:"type"`
Memory MemoryConfig `json:"memory" yaml:"memory"`
None struct{} `json:"none" yaml:"none"`
}
// NewConfig returns a configuration struct fully populated with default values.
func NewConfig() Config {
return Config{
Type: "none",
Memory: NewMemoryConfig(),
None: struct{}{},
}
}
// SanitiseConfig returns a sanitised version of the Config, meaning sections
// that aren't relevant to behaviour are removed.
func SanitiseConfig(conf Config) (interface{}, error) {
return conf.Sanitised(false)
}
// Sanitised returns a sanitised version of the config, meaning sections that
// aren't relevant to behaviour are removed. Also optionally removes deprecated
// fields.
func (conf Config) Sanitised(removeDeprecated bool) (interface{}, error) {
cBytes, err := json.Marshal(conf)
if err != nil {
return nil, err
}
hashMap := map[string]interface{}{}
if err = json.Unmarshal(cBytes, &hashMap); err != nil {
return nil, err
}
outputMap := config.Sanitised{}
outputMap["type"] = hashMap["type"]
if sfunc := Constructors[conf.Type].sanitiseConfigFunc; sfunc != nil {
if outputMap[conf.Type], err = sfunc(conf); err != nil {
return nil, err
}
} else {
if _, exists := hashMap[conf.Type]; exists {
outputMap[conf.Type] = hashMap[conf.Type]
}
}
t := conf.Type
def := Constructors[t]
if removeDeprecated {
if m, ok := outputMap[t].(map[string]interface{}); ok {
for _, spec := range def.FieldSpecs {
if spec.Deprecated {
delete(m, spec.Name)
}
}
}
}
return outputMap, nil
}
//------------------------------------------------------------------------------
// UnmarshalYAML ensures that when parsing configs that are in a map or slice
// the default values are still applied.
func (conf *Config) UnmarshalYAML(value *yaml.Node) error {
type confAlias Config
aliased := confAlias(NewConfig())
if err := value.Decode(&aliased); err != nil {
return fmt.Errorf("line %v: %v", value.Line, err)
}
var raw interface{}
if err := value.Decode(&raw); err != nil {
return fmt.Errorf("line %v: %v", value.Line, err)
}
if typeCandidates := config.GetInferenceCandidates(raw); len(typeCandidates) > 0 {
var inferredType string
for _, tc := range typeCandidates {
if _, exists := Constructors[tc]; exists {
if len(inferredType) > 0 {
return fmt.Errorf("line %v: unable to infer type, multiple candidates '%v' and '%v'", value.Line, inferredType, tc)
}
inferredType = tc
}
}
if len(inferredType) == 0 {
return fmt.Errorf("line %v: unable to infer type, candidates were: %v", value.Line, typeCandidates)
}
aliased.Type = inferredType
}
if _, exists := Constructors[aliased.Type]; !exists {
return fmt.Errorf("line %v: type '%v' was not recognised", value.Line, aliased.Type)
}
*conf = Config(aliased)
return nil
}
//------------------------------------------------------------------------------
var header = "This document was generated with `benthos --list-buffers`" + `
Benthos uses a transaction based model for guaranteeing delivery of messages
without the need for a buffer. This ensures that messages are never acknowledged
from a source until the message has left the target sink.
However, sometimes the transaction model is undesired, in which case there are a
range of buffer options available which decouple input sources from the rest of
the Benthos pipeline.
Buffers can therefore solve a number of typical streaming problems but come at
the cost of weakening the delivery guarantees of your pipeline. Common problems
that might warrant use of a buffer are:
- Input sources can periodically spike beyond the capacity of your output sinks.
- You want to use parallel [processing pipelines](/docs/configuration/processing_pipelines).
- You have more outputs than inputs and wish to distribute messages across them
in order to maximize overall throughput.
- Your input source needs occasional protection against back pressure from your
sink, e.g. during restarts. Please keep in mind that all buffers have an
eventual limit.
If you believe that a problem you have would be solved by a buffer the next step
is to choose an implementation based on the throughput and delivery guarantees
you need. In order to help here are some simplified tables outlining the
different options and their qualities:
#### Performance
| Type | Throughput | Consumers | Capacity |
| --------- | ---------- | --------- | -------- |
| Memory | Highest | Parallel | RAM |
#### Delivery Guarantees
| Event | Shutdown | Crash | Disk Corruption |
| --------- | --------- | --------- | --------------- |
| Memory | Flushed\* | Lost | Lost |
\* Makes a best attempt at flushing the remaining messages before closing
gracefully.`
// Descriptions returns a formatted string of collated descriptions of each type.
func Descriptions() string {
// Order our buffer types alphabetically
names := []string{}
for name := range Constructors {
names = append(names, name)
}
sort.Strings(names)
buf := bytes.Buffer{}
buf.WriteString("Buffers\n")
buf.WriteString(strings.Repeat("=", 7))
buf.WriteString("\n\n")
buf.WriteString(header)
buf.WriteString("\n\n")
buf.WriteString("### Contents\n\n")
for i, name := range names {
buf.WriteString(fmt.Sprintf("%v. [`%v`](#%v)\n", i+1, name, name))
}
buf.WriteString("\n")
// Append each description
for i, name := range names {
var confBytes []byte
conf := NewConfig()
conf.Type = name
if confSanit, err := SanitiseConfig(conf); err == nil {
confBytes, _ = config.MarshalYAML(confSanit)
}
buf.WriteString("## ")
buf.WriteString("`" + name + "`")
buf.WriteString("\n")
if confBytes != nil {
buf.WriteString("\n``` yaml\n")
buf.Write(confBytes)
buf.WriteString("```\n")
}
buf.WriteString(Constructors[name].Description)
buf.WriteString("\n")
if i != (len(names) - 1) {
buf.WriteString("\n---\n")
}
}
return buf.String()
}
// New creates a buffer type based on a buffer configuration.
func New(conf Config, mgr types.Manager, log log.Modular, stats metrics.Type) (Type, error) {
if c, ok := Constructors[conf.Type]; ok {
return c.constructor(conf, mgr, log, stats)
}
return nil, types.ErrInvalidBufferType
}
//------------------------------------------------------------------------------