forked from redpanda-data/connect
-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathconstructor.go
188 lines (156 loc) · 6.08 KB
/
constructor.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
// Copyright (c) 2014 Ashley Jeffs
//
// Permission is hereby granted, free of charge, to any person obtaining a copy
// of this software and associated documentation files (the "Software"), to deal
// in the Software without restriction, including without limitation the rights
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
// copies of the Software, and to permit persons to whom the Software is
// furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in
// all copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
// THE SOFTWARE.
package buffer
import (
"bytes"
"encoding/json"
"fmt"
"sort"
"strings"
"github.com/Jeffail/benthos/lib/buffer/single"
"github.com/Jeffail/benthos/lib/log"
"github.com/Jeffail/benthos/lib/metrics"
"github.com/Jeffail/benthos/lib/types"
"github.com/Jeffail/benthos/lib/util/config"
yaml "gopkg.in/yaml.v2"
)
//------------------------------------------------------------------------------
// TypeSpec is a constructor and usage description for each buffer type.
type TypeSpec struct {
constructor func(conf Config, log log.Modular, stats metrics.Type) (Type, error)
description string
}
// Constructors is a map of all buffer types with their specs.
var Constructors = map[string]TypeSpec{}
//------------------------------------------------------------------------------
// String constants representing each buffer type.
const (
TypeMemory = "memory"
TypeMMAP = "mmap_file"
TypeNone = "none"
)
//------------------------------------------------------------------------------
// Config is the all encompassing configuration struct for all buffer types.
type Config struct {
Type string `json:"type" yaml:"type"`
Memory single.MemoryConfig `json:"memory" yaml:"memory"`
Mmap single.MmapBufferConfig `json:"mmap_file" yaml:"mmap_file"`
None struct{} `json:"none" yaml:"none"`
}
// NewConfig returns a configuration struct fully populated with default values.
func NewConfig() Config {
return Config{
Type: "none",
Memory: single.NewMemoryConfig(),
Mmap: single.NewMmapBufferConfig(),
None: struct{}{},
}
}
// SanitiseConfig returns a sanitised version of the Config, meaning sections
// that aren't relevant to behaviour are removed.
func SanitiseConfig(conf Config) (interface{}, error) {
cBytes, err := json.Marshal(conf)
if err != nil {
return nil, err
}
hashMap := map[string]interface{}{}
if err = json.Unmarshal(cBytes, &hashMap); err != nil {
return nil, err
}
outputMap := config.Sanitised{}
outputMap["type"] = hashMap["type"]
outputMap[conf.Type] = hashMap[conf.Type]
return outputMap, nil
}
//------------------------------------------------------------------------------
var header = "This document was generated with `benthos --list-buffers`" + `
Buffers can solve a number of typical streaming problems and are worth
considering if you face circumstances similar to the following:
- Input sources can periodically spike beyond the capacity of your output sinks.
- You want to use parallel [processing pipelines](../pipeline.md).
- You have more outputs than inputs and wish to distribute messages across them
in order to maximize overall throughput.
- Your input source needs occasional protection against back pressure from your
sink, e.g. during restarts. Please keep in mind that all buffers have an
eventual limit.
If you believe that a problem you have would be solved by a buffer the next step
is to choose an implementation based on the throughput and delivery guarantees
you need. In order to help here are some simplified tables outlining the
different options and their qualities:
#### Performance
| Type | Throughput | Consumers | Capacity |
| --------- | ---------- | --------- | -------- |
| Memory | Highest | Parallel | RAM |
| Mmap File | High | Single | Disk |
#### Delivery Guarantees
| Type | On Restart | On Crash | On Disk Corruption |
| --------- | ---------- | --------- | ------------------ |
| Memory | Lost | Lost | Lost |
| Mmap File | Persisted | Lost | Lost |`
// Descriptions returns a formatted string of collated descriptions of each type.
func Descriptions() string {
// Order our buffer types alphabetically
names := []string{}
for name := range Constructors {
names = append(names, name)
}
sort.Strings(names)
buf := bytes.Buffer{}
buf.WriteString("Buffers\n")
buf.WriteString(strings.Repeat("=", 7))
buf.WriteString("\n\n")
buf.WriteString(header)
buf.WriteString("\n\n")
buf.WriteString("### Contents\n\n")
for i, name := range names {
buf.WriteString(fmt.Sprintf("%v. [`%v`](#%v)\n", i+1, name, name))
}
buf.WriteString("\n")
// Append each description
for i, name := range names {
var confBytes []byte
conf := NewConfig()
conf.Type = name
if confSanit, err := SanitiseConfig(conf); err == nil {
confBytes, _ = yaml.Marshal(confSanit)
}
buf.WriteString("## ")
buf.WriteString("`" + name + "`")
buf.WriteString("\n")
if confBytes != nil {
buf.WriteString("\n``` yaml\n")
buf.Write(confBytes)
buf.WriteString("```\n")
}
buf.WriteString(Constructors[name].description)
if i != (len(names) - 1) {
buf.WriteString("\n\n")
}
}
return buf.String()
}
// New creates a buffer type based on a buffer configuration.
func New(conf Config, log log.Modular, stats metrics.Type) (Type, error) {
if c, ok := Constructors[conf.Type]; ok {
return c.constructor(conf, log, stats)
}
return nil, types.ErrInvalidBufferType
}
//------------------------------------------------------------------------------