forked from redpanda-data/connect
-
Notifications
You must be signed in to change notification settings - Fork 0
/
create.go
176 lines (159 loc) · 5.03 KB
/
create.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
package cli
import (
"errors"
"fmt"
"os"
"strings"
"github.com/urfave/cli/v2"
"gopkg.in/yaml.v3"
"github.com/benthosdev/benthos/v4/internal/bundle"
"github.com/benthosdev/benthos/v4/internal/component/cache"
"github.com/benthosdev/benthos/v4/internal/component/input"
"github.com/benthosdev/benthos/v4/internal/component/output"
"github.com/benthosdev/benthos/v4/internal/component/processor"
"github.com/benthosdev/benthos/v4/internal/component/ratelimit"
"github.com/benthosdev/benthos/v4/internal/config"
"github.com/benthosdev/benthos/v4/internal/docs"
"github.com/benthosdev/benthos/v4/internal/pipeline"
)
func addExpression(conf *config.Type, expression string) error {
var inputTypes, processorTypes, outputTypes []string
componentTypes := strings.Split(expression, "/")
for i, str := range componentTypes {
for _, t := range strings.Split(str, ",") {
if t = strings.TrimSpace(t); len(t) > 0 {
switch i {
case 0:
inputTypes = append(inputTypes, t)
case 1:
processorTypes = append(processorTypes, t)
case 2:
outputTypes = append(outputTypes, t)
default:
return errors.New("more component separators than expected")
}
}
}
}
if lInputs := len(inputTypes); lInputs == 1 {
t := inputTypes[0]
if _, exists := bundle.AllInputs.DocsFor(t); exists {
conf.Input.Type = t
} else {
return fmt.Errorf("unrecognised input type '%v'", t)
}
} else if lInputs > 1 {
conf.Input.Type = "broker"
for _, t := range inputTypes {
c := input.NewConfig()
if _, exists := bundle.AllInputs.DocsFor(t); exists {
c.Type = t
} else {
return fmt.Errorf("unrecognised input type '%v'", t)
}
conf.Input.Broker.Inputs = append(conf.Input.Broker.Inputs, c)
}
}
for _, t := range processorTypes {
c := processor.NewConfig()
if _, exists := bundle.AllProcessors.DocsFor(t); exists {
c.Type = t
} else {
return fmt.Errorf("unrecognised processor type '%v'", t)
}
conf.Pipeline.Processors = append(conf.Pipeline.Processors, c)
}
if lOutputs := len(outputTypes); lOutputs == 1 {
t := outputTypes[0]
if _, exists := bundle.AllOutputs.DocsFor(t); exists {
conf.Output.Type = t
} else {
return fmt.Errorf("unrecognised output type '%v'", t)
}
} else if lOutputs > 1 {
conf.Output.Type = "broker"
for _, t := range outputTypes {
c := output.NewConfig()
if _, exists := bundle.AllOutputs.DocsFor(t); exists {
c.Type = t
} else {
return fmt.Errorf("unrecognised output type '%v'", t)
}
conf.Output.Broker.Outputs = append(conf.Output.Broker.Outputs, c)
}
}
return nil
}
type minimalCreateConfig struct {
Input input.Config `json:"input" yaml:"input"`
Pipeline pipeline.Config `json:"pipeline" yaml:"pipeline"`
Output output.Config `json:"output" yaml:"output"`
ResourceCaches []cache.Config `json:"cache_resources,omitempty" yaml:"cache_resources,omitempty"`
ResourceRateLimits []ratelimit.Config `json:"rate_limit_resources,omitempty" yaml:"rate_limit_resources,omitempty"`
}
func createCliCommand() *cli.Command {
return &cli.Command{
Name: "create",
Usage: "Create a new Benthos config",
Description: `
Prints a new Benthos config to stdout containing specified components
according to an expression. The expression must take the form of three
comma-separated lists of inputs, processors and outputs, divided by
forward slashes:
benthos create stdin/bloblang,awk/nats
benthos create file,http_server/protobuf/http_client
If the expression is omitted a default config is created.`[1:],
Flags: []cli.Flag{
&cli.BoolFlag{
Name: "small",
Aliases: []string{"s"},
Value: false,
Usage: "Print only the main components of a Benthos config (input, pipeline, output) and omit all fields marked as advanced.",
},
},
Action: func(c *cli.Context) error {
conf := config.New()
if expression := c.Args().First(); len(expression) > 0 {
if err := addExpression(&conf, expression); err != nil {
fmt.Fprintf(os.Stderr, "Generate error: %v\n", err)
os.Exit(1)
}
}
var filter docs.FieldFilter
var iconf any = conf
if c.Bool("small") {
iconf = minimalCreateConfig{
Input: conf.Input,
Pipeline: conf.Pipeline,
Output: conf.Output,
ResourceCaches: conf.ResourceCaches,
ResourceRateLimits: conf.ResourceRateLimits,
}
filter = func(spec docs.FieldSpec, _ any) bool {
return !spec.IsAdvanced
}
}
var node yaml.Node
err := node.Encode(iconf)
if err == nil {
sanitConf := docs.NewSanitiseConfig()
sanitConf.RemoveTypeField = true
sanitConf.RemoveDeprecated = true
sanitConf.ForExample = true
sanitConf.Filter = filter
err = config.Spec().SanitiseYAML(&node, sanitConf)
}
if err == nil {
var configYAML []byte
if configYAML, err = config.MarshalYAML(node); err == nil {
fmt.Println(string(configYAML))
}
}
if err != nil {
fmt.Fprintf(os.Stderr, "Generate error: %v\n", err)
os.Exit(1)
}
return nil
},
}
}