-
Notifications
You must be signed in to change notification settings - Fork 783
/
processor.go
368 lines (317 loc) · 12.5 KB
/
processor.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
package mongodb
import (
"context"
"errors"
"fmt"
"strconv"
"time"
"go.mongodb.org/mongo-driver/bson"
"go.mongodb.org/mongo-driver/mongo"
"go.mongodb.org/mongo-driver/mongo/options"
"go.mongodb.org/mongo-driver/mongo/writeconcern"
"github.com/benthosdev/benthos/v4/internal/bloblang/field"
"github.com/benthosdev/benthos/v4/internal/bloblang/mapping"
"github.com/benthosdev/benthos/v4/internal/bundle"
"github.com/benthosdev/benthos/v4/internal/component/metrics"
iprocessor "github.com/benthosdev/benthos/v4/internal/component/processor"
"github.com/benthosdev/benthos/v4/internal/docs"
"github.com/benthosdev/benthos/v4/internal/impl/mongodb/client"
"github.com/benthosdev/benthos/v4/internal/log"
"github.com/benthosdev/benthos/v4/internal/message"
"github.com/benthosdev/benthos/v4/internal/old/processor"
"github.com/benthosdev/benthos/v4/internal/old/util/retries"
"github.com/benthosdev/benthos/v4/internal/shutdown"
"github.com/benthosdev/benthos/v4/internal/tracing"
)
//------------------------------------------------------------------------------
func init() {
err := bundle.AllProcessors.Add(func(c processor.Config, nm bundle.NewManagement) (iprocessor.V1, error) {
v2Proc, err := NewProcessor(c, nm, nm.Logger(), nm.Metrics())
if err != nil {
return nil, err
}
return iprocessor.NewV2BatchedToV1Processor("", v2Proc, nm.Metrics()), nil
}, docs.ComponentSpec{
Name: processor.TypeMongoDB,
Type: docs.TypeProcessor,
Status: docs.StatusExperimental,
Version: "3.43.0",
Categories: []string{"Integration"},
Summary: `Performs operations against MongoDB for each message, allowing you to store or retrieve data within message payloads.`,
Config: docs.FieldComponent().WithChildren(
client.ConfigDocs().Add(
processorOperationDocs(client.OperationInsertOne),
docs.FieldString("collection", "The name of the target collection in the MongoDB DB.").IsInterpolated(),
docs.FieldObject(
"write_concern",
"The write_concern settings for the mongo connection.",
).WithChildren(writeConcernDocs()...),
docs.FieldBloblang(
"document_map",
"A bloblang map representing the records in the mongo db. Used to generate the document for mongodb by "+
"mapping the fields in the message to the mongodb fields. The document map is required for the operations "+
"insert-one, replace-one and update-one.",
mapExamples()...,
),
docs.FieldBloblang(
"filter_map",
"A bloblang map representing the filter for the mongo db command. The filter map is required for all operations except "+
"insert-one. It is used to find the document(s) for the operation. For example in a delete-one case, the filter map should "+
"have the fields required to locate the document to delete.",
mapExamples()...,
),
docs.FieldBloblang(
"hint_map",
"A bloblang map representing the hint for the mongo db command. This map is optional and is used with all operations "+
"except insert-one. It is used to improve performance of finding the documents in the mongodb.",
mapExamples()...,
),
docs.FieldBool(
"upsert",
"The upsert setting is optional and only applies for update-one and replace-one operations. If the filter specified in filter_map matches,"+
"the document is updated or replaced accordingly, otherwise it is created.",
).HasDefault(false).AtVersion("3.60.0"),
docs.FieldString(
"json_marshal_mode",
"The json_marshal_mode setting is optional and controls the format of the output message.",
).HasDefault(client.JSONMarshalModeCanonical).Advanced().HasAnnotatedOptions(
string(client.JSONMarshalModeCanonical), "A string format that emphasizes type preservation at the expense of readability and interoperability. "+
"That is, conversion from canonical to BSON will generally preserve type information except in certain specific cases. ",
string(client.JSONMarshalModeRelaxed), "A string format that emphasizes readability and interoperability at the expense of type preservation."+
"That is, conversion from relaxed format to BSON can lose type information.",
).AtVersion("3.60.0"),
).Merge(retries.FieldSpecs())...,
).ChildDefaultAndTypesFromStruct(processor.NewMongoDBConfig()),
})
if err != nil {
panic(err)
}
}
//------------------------------------------------------------------------------
// Processor stores or retrieves data from a mongo db for each message of a
// batch
type Processor struct {
conf processor.MongoDBConfig
log log.Modular
stats metrics.Type
client *mongo.Client
collection *field.Expression
database *mongo.Database
writeConcernCollectionOption *options.CollectionOptions
filterMap *mapping.Executor
documentMap *mapping.Executor
hintMap *mapping.Executor
operation client.Operation
shutSig *shutdown.Signaller
}
// NewProcessor returns a MongoDB processor.
func NewProcessor(
conf processor.Config, mgr bundle.NewManagement, log log.Modular, stats metrics.Type,
) (iprocessor.V2Batched, error) {
// TODO: V4 Remove this after V4 lands and #972 is fixed
operation := client.NewOperation(conf.MongoDB.Operation)
if operation == client.OperationInvalid {
return nil, fmt.Errorf("mongodb operation '%s' unknown: must be insert-one, delete-one, delete-many, replace-one, update-one or find-one", conf.MongoDB.Operation)
}
m := &Processor{
conf: conf.MongoDB,
log: log,
stats: stats,
operation: operation,
shutSig: shutdown.NewSignaller(),
}
if conf.MongoDB.MongoDB.URL == "" {
return nil, errors.New("mongo url must be specified")
}
if conf.MongoDB.MongoDB.Database == "" {
return nil, errors.New("mongo database must be specified")
}
if conf.MongoDB.MongoDB.Collection == "" {
return nil, errors.New("mongo collection must be specified")
}
bEnv := mgr.BloblEnvironment()
var err error
if isFilterAllowed(m.operation) {
if conf.MongoDB.FilterMap == "" {
return nil, errors.New("mongodb filter_map must be specified")
}
if m.filterMap, err = bEnv.NewMapping(conf.MongoDB.FilterMap); err != nil {
return nil, fmt.Errorf("failed to parse filter_map: %v", err)
}
} else if conf.MongoDB.FilterMap != "" {
return nil, fmt.Errorf("mongodb filter_map not allowed for '%s' operation", conf.MongoDB.Operation)
}
if isDocumentAllowed(m.operation) {
if conf.MongoDB.DocumentMap == "" {
return nil, errors.New("mongodb document_map must be specified")
}
if m.documentMap, err = bEnv.NewMapping(conf.MongoDB.DocumentMap); err != nil {
return nil, fmt.Errorf("failed to parse document_map: %v", err)
}
} else if conf.MongoDB.DocumentMap != "" {
return nil, fmt.Errorf("mongodb document_map not allowed for '%s' operation", conf.MongoDB.Operation)
}
if isHintAllowed(m.operation) && conf.MongoDB.HintMap != "" {
if m.hintMap, err = bEnv.NewMapping(conf.MongoDB.HintMap); err != nil {
return nil, fmt.Errorf("failed to parse hint_map: %v", err)
}
} else if conf.MongoDB.HintMap != "" {
return nil, fmt.Errorf("mongodb hint_map not allowed for '%s' operation", conf.MongoDB.Operation)
}
if !isUpsertAllowed(m.operation) && conf.MongoDB.Upsert {
return nil, fmt.Errorf("mongodb upsert not allowed for '%s' operation", conf.MongoDB.Operation)
}
if m.client, err = conf.MongoDB.MongoDB.Client(); err != nil {
return nil, fmt.Errorf("failed to create mongodb client: %v", err)
}
if err = m.client.Connect(context.Background()); err != nil {
return nil, fmt.Errorf("failed to connect: %v", err)
}
if err = m.client.Ping(context.Background(), nil); err != nil {
return nil, fmt.Errorf("ping failed: %v", err)
}
var timeout time.Duration
if timeout, err = time.ParseDuration(conf.MongoDB.WriteConcern.WTimeout); err != nil {
return nil, fmt.Errorf("failed to parse wtimeout string: %v", err)
}
writeConcern := writeconcern.New(
writeconcern.J(conf.MongoDB.WriteConcern.J),
writeconcern.WTimeout(timeout))
w, err := strconv.Atoi(conf.MongoDB.WriteConcern.W)
if err != nil {
writeconcern.WTagSet(conf.MongoDB.WriteConcern.W)
} else {
writeconcern.W(w)(writeConcern)
}
// This does some validation so we don't have to
if _, _, err = writeConcern.MarshalBSONValue(); err != nil {
return nil, fmt.Errorf("write_concern validation error: %w", err)
}
if m.collection, err = mgr.BloblEnvironment().NewField(m.conf.MongoDB.Collection); err != nil {
return nil, fmt.Errorf("failed to parse collection expression: %v", err)
}
m.database = m.client.Database(conf.MongoDB.MongoDB.Database)
m.writeConcernCollectionOption = options.Collection().SetWriteConcern(writeConcern)
return m, nil
}
// ProcessBatch applies the processor to a message batch, either creating >0
// resulting messages or a response to be sent back to the message source.
func (m *Processor) ProcessBatch(ctx context.Context, spans []*tracing.Span, batch *message.Batch) ([]*message.Batch, error) {
newBatch := batch.Copy()
writeModelsMap := map[*mongo.Collection][]mongo.WriteModel{}
processor.IteratePartsWithSpanV2("mongodb", nil, newBatch, func(i int, s *tracing.Span, p *message.Part) error {
var err error
var filterVal, documentVal *message.Part
var upsertVal, filterValWanted, documentValWanted bool
filterValWanted = isFilterAllowed(m.operation)
documentValWanted = isDocumentAllowed(m.operation)
upsertVal = m.conf.Upsert
if filterValWanted {
if filterVal, err = m.filterMap.MapPart(i, newBatch); err != nil {
return fmt.Errorf("failed to execute filter_map: %v", err)
}
}
if (filterVal != nil || !filterValWanted) && documentValWanted {
if documentVal, err = m.documentMap.MapPart(i, newBatch); err != nil {
return fmt.Errorf("failed to execute document_map: %v", err)
}
}
if filterVal == nil && filterValWanted {
return fmt.Errorf("failed to generate filterVal")
}
if documentVal == nil && documentValWanted {
return fmt.Errorf("failed to generate documentVal")
}
var docJSON, filterJSON, hintJSON interface{}
if filterValWanted {
if filterJSON, err = filterVal.JSON(); err != nil {
return err
}
}
if documentValWanted {
if docJSON, err = documentVal.JSON(); err != nil {
return err
}
}
findOptions := &options.FindOneOptions{}
if m.hintMap != nil {
hintVal, err := m.hintMap.MapPart(i, newBatch)
if err != nil {
return fmt.Errorf("failed to execute hint_map: %v", err)
}
if hintJSON, err = hintVal.JSON(); err != nil {
return err
}
findOptions.Hint = hintJSON
}
var writeModel mongo.WriteModel
collection := m.database.Collection(m.collection.String(i, newBatch), m.writeConcernCollectionOption)
switch m.operation {
case client.OperationInsertOne:
writeModel = &mongo.InsertOneModel{
Document: docJSON,
}
case client.OperationDeleteOne:
writeModel = &mongo.DeleteOneModel{
Filter: filterJSON,
Hint: hintJSON,
}
case client.OperationDeleteMany:
writeModel = &mongo.DeleteManyModel{
Filter: filterJSON,
Hint: hintJSON,
}
case client.OperationReplaceOne:
writeModel = &mongo.ReplaceOneModel{
Upsert: &upsertVal,
Filter: filterJSON,
Replacement: docJSON,
Hint: hintJSON,
}
case client.OperationUpdateOne:
writeModel = &mongo.UpdateOneModel{
Upsert: &upsertVal,
Filter: filterJSON,
Update: docJSON,
Hint: hintJSON,
}
case client.OperationFindOne:
var decoded interface{}
err := collection.FindOne(context.Background(), filterJSON, findOptions).Decode(&decoded)
if err != nil {
if err == mongo.ErrNoDocuments {
return err
}
m.log.Errorf("Error decoding mongo db result, filter = %v: %s", filterJSON, err)
return err
}
data, err := bson.MarshalExtJSON(decoded, m.conf.JSONMarshalMode == client.JSONMarshalModeCanonical, false)
if err != nil {
return err
}
p.Set(data)
return nil
}
if writeModel != nil {
writeModelsMap[collection] = append(writeModelsMap[collection], writeModel)
}
return nil
})
if len(writeModelsMap) > 0 {
for collection, writeModels := range writeModelsMap {
// We should have at least one write model in the slice
if _, err := collection.BulkWrite(context.Background(), writeModels); err != nil {
m.log.Errorf("Bulk write failed in mongodb processor: %v", err)
_ = newBatch.Iter(func(i int, p *message.Part) error {
iprocessor.MarkErr(p, spans[i], err)
return nil
})
}
}
}
return []*message.Batch{newBatch}, nil
}
// Close shuts down the processor and stops processing requests.
func (m *Processor) Close(ctx context.Context) error {
return m.client.Disconnect(ctx)
}