forked from redpanda-data/connect
-
Notifications
You must be signed in to change notification settings - Fork 0
/
output_sql_deprecated.go
81 lines (71 loc) · 2.61 KB
/
output_sql_deprecated.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
package sql
import (
"github.com/benthosdev/benthos/v4/public/bloblang"
"github.com/benthosdev/benthos/v4/public/service"
)
func sqlDeprecatedOutputConfig() *service.ConfigSpec {
return service.NewConfigSpec().
Deprecated().
Categories("Services").
Summary("Executes an arbitrary SQL query for each message.").
Description(`
## Alternatives
For basic inserts use the ` + "[`sql_insert`](/docs/components/outputs/sql)" + ` output. For more complex queries use the ` + "[`sql_raw`](/docs/components/outputs/sql_raw)" + ` output.`).
Field(driverField).
Field(service.NewStringField("data_source_name").Description("Data source name.")).
Field(rawQueryField().
Example("INSERT INTO footable (foo, bar, baz) VALUES (?, ?, ?);")).
Field(service.NewBloblangField("args_mapping").
Description("An optional [Bloblang mapping](/docs/guides/bloblang/about) which should evaluate to an array of values matching in size to the number of placeholder arguments in the field `query`.").
Example("root = [ this.cat.meow, this.doc.woofs[0] ]").
Example(`root = [ meta("user.id") ]`).
Optional()).
Field(service.NewIntField("max_in_flight").
Description("The maximum number of inserts to run in parallel.").
Default(64)).
Field(service.NewBatchPolicyField("batching")).
Version("3.65.0")
}
func init() {
err := service.RegisterBatchOutput(
"sql", sqlDeprecatedOutputConfig(),
func(conf *service.ParsedConfig, mgr *service.Resources) (out service.BatchOutput, batchPolicy service.BatchPolicy, maxInFlight int, err error) {
if batchPolicy, err = conf.FieldBatchPolicy("batching"); err != nil {
return
}
if maxInFlight, err = conf.FieldInt("max_in_flight"); err != nil {
return
}
out, err = newSQLDeprecatedOutputFromConfig(conf, mgr)
return
})
if err != nil {
panic(err)
}
}
//------------------------------------------------------------------------------
func newSQLDeprecatedOutputFromConfig(conf *service.ParsedConfig, mgr *service.Resources) (*sqlRawOutput, error) {
driverStr, err := conf.FieldString("driver")
if err != nil {
return nil, err
}
dsnStr, err := conf.FieldString("data_source_name")
if err != nil {
return nil, err
}
queryStatic, err := conf.FieldString("query")
if err != nil {
return nil, err
}
var argsMapping *bloblang.Executor
if conf.Contains("args_mapping") {
if argsMapping, err = conf.FieldBloblang("args_mapping"); err != nil {
return nil, err
}
}
connSettings, err := connSettingsFromParsed(conf, mgr)
if err != nil {
return nil, err
}
return newSQLRawOutput(mgr.Logger(), driverStr, dsnStr, queryStatic, nil, argsMapping, connSettings), nil
}