forked from redpanda-data/connect
-
Notifications
You must be signed in to change notification settings - Fork 0
/
run.go
366 lines (334 loc) · 10.1 KB
/
run.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
package service
import (
"context"
"fmt"
"os"
"runtime/debug"
"github.com/dafanshu/benthos/v3/internal/bloblang/parser"
"github.com/dafanshu/benthos/v3/internal/cli/studio"
clitemplate "github.com/dafanshu/benthos/v3/internal/cli/template"
"github.com/dafanshu/benthos/v3/internal/docs"
"github.com/dafanshu/benthos/v3/internal/filepath"
"github.com/dafanshu/benthos/v3/internal/template"
"github.com/dafanshu/benthos/v3/lib/config"
"github.com/dafanshu/benthos/v3/lib/service/blobl"
"github.com/dafanshu/benthos/v3/lib/service/test"
uconfig "github.com/dafanshu/benthos/v3/lib/util/config"
"github.com/urfave/cli/v2"
"gopkg.in/yaml.v3"
// TODO: V4 Remove this as it's a temporary work around to ensure current
// plugin users automatically import all components.
_ "github.com/dafanshu/benthos/v3/public/components/legacy"
)
//------------------------------------------------------------------------------
// Build stamps.
var (
Version string
DateBuilt string
)
func init() {
if Version == "" {
if info, ok := debug.ReadBuildInfo(); ok {
for _, mod := range info.Deps {
if mod.Path == "github.com/dafanshu/benthos/v3" {
Version = mod.Version
}
}
}
}
if DateBuilt == "" {
DateBuilt = "unknown"
}
}
// OptSetVersionStamp creates an opt func for setting the version and date built
// stamps that Benthos returns via --version and the /version endpoint. The
// traditional way of setting these values is via the build flags:
// -X github.com/dafanshu/benthos/v3/lib/service.Version=$(VERSION) and
// -X github.com/dafanshu/benthos/v3/lib/service.DateBuilt=$(DATE)
func OptSetVersionStamp(version, dateBuilt string) func() {
return func() {
Version = version
DateBuilt = dateBuilt
}
}
//------------------------------------------------------------------------------
var customFlags []cli.Flag
// OptAddStringFlag registers a custom CLI flag for the standard Benthos run
// command.
func OptAddStringFlag(name, usage string, aliases []string, value string, destination *string) func() {
return func() {
customFlags = append(customFlags, &cli.StringFlag{
Name: name,
Aliases: aliases,
Value: value,
Usage: usage,
Destination: destination,
})
}
}
//------------------------------------------------------------------------------
var optContext = context.Background()
// OptUseContext sets a context to be used for cancellation during the run
// command. This adds one extra mechanism for graceful termination.
func OptUseContext(ctx context.Context) func() {
return func() {
optContext = ctx
}
}
//------------------------------------------------------------------------------
func cmdVersion() {
version, dateBuilt := Version, DateBuilt
if version == "" {
info, ok := debug.ReadBuildInfo()
if ok {
for _, mod := range info.Deps {
if mod.Path == "github.com/dafanshu/benthos/v3" {
version = mod.Version
}
}
}
}
fmt.Printf("Version: %v\nDate: %v\n", version, dateBuilt)
os.Exit(0)
}
//------------------------------------------------------------------------------
// RunWithOpts runs the Benthos service after first applying opt funcs, which
// are used for specify service customisations.
func RunWithOpts(opts ...func()) {
for _, opt := range opts {
opt()
}
Run()
}
// Run the Benthos service, if the pipeline is started successfully then this
// call blocks until either the pipeline shuts down or a termination signal is
// received.
func Run() {
flags := []cli.Flag{
&cli.BoolFlag{
Name: "version",
Aliases: []string{"v"},
Value: false,
Usage: "display version info, then exit",
},
&cli.StringFlag{
Name: "env-file",
Aliases: []string{"e"},
Value: "",
Usage: "import environment variables from a dotenv file",
},
&cli.StringFlag{
Name: "log.level",
Value: "",
Usage: "override the configured log level, options are: off, error, warn, info, debug, trace",
},
&cli.StringSliceFlag{
Name: "set",
Aliases: []string{"s"},
Usage: "set a field (identified by a dot path) in the main configuration file, e.g. `\"metrics.type=prometheus\"`",
},
&cli.StringFlag{
Name: "config",
Aliases: []string{"c"},
Value: "",
Usage: "a path to a configuration file",
},
&cli.StringSliceFlag{
Name: "resources",
Aliases: []string{"r"},
Usage: "pull in extra resources from a file, which can be referenced the same as resources defined in the main config, supports glob patterns (requires quotes)",
},
&cli.StringSliceFlag{
Name: "templates",
Aliases: []string{"t"},
Usage: "EXPERIMENTAL: import Benthos templates, supports glob patterns (requires quotes)",
},
&cli.BoolFlag{
Name: "chilled",
Value: false,
Usage: "continue to execute a config containing linter errors",
},
&cli.BoolFlag{
Name: "watcher",
Aliases: []string{"w"},
Value: false,
Usage: "EXPERIMENTAL: watch config files for changes and automatically apply them",
},
}
if len(customFlags) > 0 {
flags = append(flags, customFlags...)
}
app := &cli.App{
Name: "benthos",
Usage: "A stream processor for mundane tasks - https://www.benthos.dev",
Description: `
Either run Benthos as a stream processor or choose a command:
benthos list inputs
benthos create kafka//file > ./config.yaml
benthos -c ./config.yaml
benthos -r "./production/*.yaml" -c ./config.yaml`[1:],
Flags: flags,
Before: func(c *cli.Context) error {
if dotEnvFile := c.String("env-file"); dotEnvFile != "" {
vars, err := parser.ParseDotEnvFile(dotEnvFile)
if err != nil {
fmt.Printf("Failed to read dotenv file: %v\n", err)
os.Exit(1)
}
for k, v := range vars {
if err = os.Setenv(k, v); err != nil {
fmt.Printf("Failed to set env var '%v': %v\n", k, err)
os.Exit(1)
}
}
}
templatesPaths, err := filepath.Globs(c.StringSlice("templates"))
if err != nil {
fmt.Printf("Failed to resolve template glob pattern: %v\n", err)
os.Exit(1)
}
lints, err := template.InitTemplates(templatesPaths...)
if err != nil {
fmt.Fprintf(os.Stderr, "Template file read error: %v\n", err)
os.Exit(1)
}
if !c.Bool("chilled") && len(lints) > 0 {
for _, lint := range lints {
fmt.Fprintln(os.Stderr, lint)
}
fmt.Println("Shutting down due to linter errors, to prevent shutdown run Benthos with --chilled")
os.Exit(1)
}
return nil
},
Action: func(c *cli.Context) error {
if c.Bool("version") {
cmdVersion()
}
if c.Args().Len() > 0 {
fmt.Fprintf(os.Stderr, "Unrecognised command: %v\n", c.Args().First())
cli.ShowAppHelp(c)
os.Exit(1)
}
os.Exit(cmdService(
c.String("config"),
c.StringSlice("resources"),
c.StringSlice("set"),
c.String("log.level"),
!c.Bool("chilled"),
c.Bool("watcher"),
false,
false,
nil,
))
return nil
},
Commands: []*cli.Command{
{
Name: "echo",
Usage: "Parse a config file and echo back a normalised version",
Description: `
This simple command is useful for sanity checking a config if it isn't
behaving as expected, as it shows you a normalised version after environment
variables have been resolved:
benthos -c ./config.yaml echo | less`[1:],
Action: func(c *cli.Context) error {
confReader := readConfig(c.String("config"), false, c.StringSlice("resources"), nil, c.StringSlice("set"))
if _, err := confReader.Read(&conf); err != nil {
fmt.Fprintf(os.Stderr, "Configuration file read error: %v\n", err)
os.Exit(1)
}
var node yaml.Node
err := node.Encode(conf)
if err == nil {
err = config.Spec().SanitiseYAML(&node, docs.SanitiseConfig{
RemoveTypeField: true,
})
}
if err == nil {
var configYAML []byte
if configYAML, err = uconfig.MarshalYAML(node); err == nil {
fmt.Println(string(configYAML))
}
}
if err != nil {
fmt.Fprintf(os.Stderr, "Echo error: %v\n", err)
os.Exit(1)
}
return nil
},
},
lintCliCommand(),
{
Name: "streams",
Usage: "Run Benthos in streams mode",
Description: `
Run Benthos in streams mode, where multiple pipelines can be executed in a
single process and can be created, updated and removed via REST HTTP
endpoints.
benthos streams
benthos -c ./root_config.yaml streams
benthos streams ./path/to/stream/configs ./and/some/more
benthos -c ./root_config.yaml streams ./streams/*.yaml
In streams mode the stream fields of a root target config (input, buffer,
pipeline, output) will be ignored. Other fields will be shared across all
loaded streams (resources, metrics, etc).
For more information check out the docs at:
https://benthos.dev/docs/guides/streams_mode/about`[1:],
Flags: []cli.Flag{
&cli.BoolFlag{
Name: "no-api",
Value: false,
Usage: "Disable the HTTP API for streams mode",
},
},
Action: func(c *cli.Context) error {
os.Exit(cmdService(
c.String("config"),
c.StringSlice("resources"),
c.StringSlice("set"),
c.String("log.level"),
!c.Bool("chilled"),
c.Bool("watcher"),
!c.Bool("no-api"),
true,
c.Args().Slice(),
))
return nil
},
},
listCliCommand(),
createCliCommand(),
test.CliCommand(testSuffix),
clitemplate.CliCommand(),
blobl.CliCommand(),
studio.CliCommand(Version, DateBuilt),
},
}
app.OnUsageError = func(context *cli.Context, err error, isSubcommand bool) error {
flags, notDeprecated := checkDeprecatedFlags(os.Args[1:])
if !notDeprecated {
fmt.Printf("Usage error: %v\n", err)
cli.ShowAppHelp(context)
return err
}
showVersion := flags.Bool(
"version", false, "Display version info, then exit",
)
configPath := flags.String(
"c", "", "Path to a configuration file",
)
flags.Usage = func() {
cli.ShowAppHelp(context)
}
flags.Parse(os.Args[1:])
if *showVersion {
cmdVersion()
}
deprecatedExecute(*configPath, testSuffix)
os.Exit(cmdService(*configPath, nil, nil, "", false, false, false, false, nil))
return nil
}
app.Run(os.Args)
}
//------------------------------------------------------------------------------