/
input_generate.go
278 lines (249 loc) · 8.01 KB
/
input_generate.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
package pure
import (
"context"
"fmt"
"strings"
"time"
"github.com/robfig/cron/v3"
"github.com/benthosdev/benthos/v4/internal/bloblang/mapping"
"github.com/benthosdev/benthos/v4/internal/bloblang/parser"
"github.com/benthosdev/benthos/v4/internal/bundle"
"github.com/benthosdev/benthos/v4/internal/component"
"github.com/benthosdev/benthos/v4/internal/component/input"
"github.com/benthosdev/benthos/v4/internal/component/interop"
"github.com/benthosdev/benthos/v4/internal/message"
"github.com/benthosdev/benthos/v4/public/service"
)
const (
giFieldMapping = "mapping"
giFieldInterval = "interval"
giFieldCount = "count"
giFieldBatchSize = "batch_size"
)
func genInputSpec() *service.ConfigSpec {
return service.NewConfigSpec().
Stable().
Categories("Utility").
Version("3.40.0").
Summary("Generates messages at a given interval using a [Bloblang](/docs/guides/bloblang/about) mapping executed without a context. This allows you to generate messages for testing your pipeline configs.").
Fields(
service.NewBloblangField(giFieldMapping).
Description("A [bloblang](/docs/guides/bloblang/about) mapping to use for generating messages.").
Examples(
`root = "hello world"`,
`root = {"test":"message","id":uuid_v4()}`,
),
service.NewStringField(giFieldInterval).
Description("The time interval at which messages should be generated, expressed either as a duration string or as a cron expression. If set to an empty string messages will be generated as fast as downstream services can process them. Cron expressions can specify a timezone by prefixing the expression with `TZ=<location name>`, where the location name corresponds to a file within the IANA Time Zone database.").
Examples(
"5s", "1m", "1h",
"@every 1s", "0,30 */2 * * * *", "TZ=Europe/London 30 3-6,20-23 * * *",
).Default("1s"),
service.NewIntField(giFieldCount).
Description("An optional number of messages to generate, if set above 0 the specified number of messages is generated and then the input will shut down.").
Default(0),
service.NewIntField(giFieldBatchSize).
Description("The number of generated messages that should be accumulated into each batch flushed at the specified interval.").
Default(1),
service.NewAutoRetryNacksToggleField(),
).
Example("Cron Scheduled Processing", "A common use case for the generate input is to trigger processors on a schedule so that the processors themselves can behave similarly to an input. The following configuration reads rows from a PostgreSQL table every 5 minutes.", `
input:
generate:
interval: '@every 5m'
mapping: 'root = {}'
processors:
- sql_select:
driver: postgres
dsn: postgres://foouser:foopass@localhost:5432/testdb?sslmode=disable
table: foo
columns: [ "*" ]
`).
Example("Generate 100 Rows", "The generate input can be used as a convenient way to generate test data. The following example generates 100 rows of structured data by setting an explicit count. The interval field is set to empty, which means data is generated as fast as the downstream components can consume it.", `
input:
generate:
count: 100
interval: ""
mapping: |
root = if random_int() % 2 == 0 {
{
"type": "foo",
"foo": "is yummy"
}
} else {
{
"type": "bar",
"bar": "is gross"
}
}
`)
}
func init() {
err := service.RegisterBatchInput("generate", genInputSpec(), func(conf *service.ParsedConfig, mgr *service.Resources) (service.BatchInput, error) {
nm := interop.UnwrapManagement(mgr)
var b input.Async
var err error
if b, err = newGenerateReaderFromParsed(conf, nm); err != nil {
return nil, err
}
if autoRetry, _ := conf.FieldBool(service.AutoRetryNacksToggleFieldName); autoRetry {
b = input.NewAsyncPreserver(b)
}
i, err := input.NewAsyncReader("generate", input.NewAsyncPreserver(b), nm)
if err != nil {
return nil, err
}
return interop.NewUnwrapInternalInput(i), nil
})
if err != nil {
panic(err)
}
}
//------------------------------------------------------------------------------
type generateReader struct {
remaining int
batchSize int
limited bool
firstIsFree bool
exec *mapping.Executor
timer *time.Ticker
schedule *cron.Schedule
schedulePrev *time.Time
}
func newGenerateReaderFromParsed(conf *service.ParsedConfig, mgr bundle.NewManagement) (*generateReader, error) {
var (
duration time.Duration
timer *time.Ticker
schedule *cron.Schedule
schedulePrev *time.Time
err error
firstIsFree = true
)
mappingStr, err := conf.FieldString(giFieldMapping)
if err != nil {
return nil, err
}
intervalStr, err := conf.FieldString(giFieldInterval)
if err != nil {
return nil, err
}
if intervalStr != "" {
if duration, err = time.ParseDuration(intervalStr); err != nil {
// interval is not a duration so try to parse as a cron expression
var cerr error
if schedule, cerr = parseCronExpression(intervalStr); cerr != nil {
return nil, fmt.Errorf("failed to parse interval as duration string: %v, or as cron expression: %w", err, cerr)
}
firstIsFree = false
tNext := (*schedule).Next(time.Now())
if duration = time.Until(tNext); duration < 1 {
duration = 1
}
schedulePrev = &tNext
}
if duration > 0 {
timer = time.NewTicker(duration)
}
}
exec, err := mgr.BloblEnvironment().NewMapping(mappingStr)
if err != nil {
if perr, ok := err.(*parser.Error); ok {
return nil, fmt.Errorf("failed to parse mapping: %v", perr.ErrorAtPosition([]rune(mappingStr)))
}
return nil, fmt.Errorf("failed to parse mapping: %v", err)
}
count, err := conf.FieldInt(giFieldCount)
if err != nil {
return nil, err
}
batchSize, err := conf.FieldInt(giFieldBatchSize)
if err != nil {
return nil, err
}
return &generateReader{
exec: exec,
remaining: count,
batchSize: batchSize,
limited: count > 0,
timer: timer,
schedule: schedule,
schedulePrev: schedulePrev,
firstIsFree: firstIsFree,
}, nil
}
func parseCronExpression(cronExpression string) (*cron.Schedule, error) {
// If time zone is not included, set default to UTC
if !strings.HasPrefix(cronExpression, "TZ=") {
cronExpression = fmt.Sprintf("TZ=%s %s", "UTC", cronExpression)
}
parser := cron.NewParser(cron.SecondOptional | cron.Minute | cron.Hour | cron.Dom | cron.Month | cron.Dow | cron.Descriptor)
cronSchedule, err := parser.Parse(cronExpression)
if err != nil {
return nil, err
}
return &cronSchedule, nil
}
// Connect establishes a Bloblang reader.
func (b *generateReader) Connect(ctx context.Context) error {
return nil
}
// ReadBatch a new bloblang generated message.
func (b *generateReader) ReadBatch(ctx context.Context) (message.Batch, input.AsyncAckFn, error) {
batchSize := b.batchSize
if b.limited {
if b.remaining <= 0 {
return nil, nil, component.ErrTypeClosed
}
if b.remaining < batchSize {
batchSize = b.remaining
}
}
if !b.firstIsFree && b.timer != nil {
select {
case t, open := <-b.timer.C:
if !open {
return nil, nil, component.ErrTypeClosed
}
if b.schedule != nil {
if b.schedulePrev != nil {
t = *b.schedulePrev
}
tNext := (*b.schedule).Next(t)
tNow := time.Now()
duration := tNext.Sub(tNow)
if duration < 1 {
duration = 1
}
b.schedulePrev = &tNext
b.timer.Reset(duration)
}
case <-ctx.Done():
return nil, nil, component.ErrTimeout
}
}
b.firstIsFree = false
batch := make(message.Batch, 0, batchSize)
for i := 0; i < batchSize; i++ {
p, err := b.exec.MapPart(0, batch)
if err != nil {
return nil, nil, err
}
if p != nil {
if b.limited {
b.remaining--
}
batch = append(batch, p)
}
}
if len(batch) == 0 {
return nil, nil, component.ErrTimeout
}
return batch, func(context.Context, error) error { return nil }, nil
}
// CloseAsync shuts down the bloblang reader.
func (b *generateReader) Close(ctx context.Context) (err error) {
if b.timer != nil {
b.timer.Stop()
}
return
}