-
Notifications
You must be signed in to change notification settings - Fork 789
/
executor.go
146 lines (129 loc) · 4.73 KB
/
executor.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
package cosmosdb
import (
"context"
"encoding/json"
"errors"
"fmt"
"github.com/Azure/azure-sdk-for-go/sdk/data/azcosmos"
"github.com/gofrs/uuid"
"github.com/benthosdev/benthos/v4/public/service"
)
// Maximum number of messages which can be pushed to Azure in a TransactionalBatch
// Details here: https://learn.microsoft.com/en-us/azure/cosmos-db/concepts-limits#per-request-limits
// and here: https://github.com/Azure/azure-cosmos-dotnet-v3/issues/1057
const maxTransactionalBatchSize = 100
// ExecMessageBatch creates a CosmosDB TransactionalBatch from the provided message batch and executes it
func ExecMessageBatch(ctx context.Context, batch service.MessageBatch, client *azcosmos.ContainerClient,
config CRUDConfig, enableContentResponseOnWrite bool,
) (azcosmos.TransactionalBatchResponse, error) {
if len(batch) > maxTransactionalBatchSize {
return azcosmos.TransactionalBatchResponse{},
fmt.Errorf("current batch has %d messages, but the CosmosDB transactional batch limit is %d", len(batch), maxTransactionalBatchSize)
}
pkQueryResult, err := batch.BloblangQueryValue(0, config.PartitionKeys)
if err != nil {
return azcosmos.TransactionalBatchResponse{}, fmt.Errorf("failed to evaluate partition key values: %s", err)
}
// TODO: Enable support for hierarchical / empty Partition Keys this when the following issues are addressed:
// - https://github.com/Azure/azure-sdk-for-go/issues/18578
// - https://github.com/Azure/azure-sdk-for-go/issues/21063
if pkValuesList, ok := pkQueryResult.([]any); ok {
if len(pkValuesList) != 1 {
return azcosmos.TransactionalBatchResponse{}, errors.New("only one partition key is supported")
}
pkQueryResult = pkValuesList[0]
}
pkValue, err := GetTypedPartitionKeyValue(pkQueryResult)
if err != nil {
return azcosmos.TransactionalBatchResponse{}, err
}
tb := client.NewTransactionalBatch(pkValue)
for idx, msg := range batch {
var b []byte
var err error
if config.Operation == OperationCreate && config.AutoID {
structuredMsg, err := msg.AsStructured()
if err != nil {
return azcosmos.TransactionalBatchResponse{}, fmt.Errorf("failed to get message bytes: %s", err)
}
if obj, ok := structuredMsg.(map[string]any); ok {
if _, ok := obj["id"]; !ok {
u4, err := uuid.NewV4()
if err != nil {
return azcosmos.TransactionalBatchResponse{}, fmt.Errorf("failed to generate uuid: %s", err)
}
obj["id"] = u4.String()
}
} else {
return azcosmos.TransactionalBatchResponse{}, fmt.Errorf("message must contain an object, got %T instead", structuredMsg)
}
if b, err = json.Marshal(structuredMsg); err != nil {
return azcosmos.TransactionalBatchResponse{}, fmt.Errorf("failed to marshal message to json: %s", err)
}
} else {
b, err = msg.AsBytes()
if err != nil {
return azcosmos.TransactionalBatchResponse{}, fmt.Errorf("failed to get message bytes: %s", err)
}
}
var id string
if config.ItemID != nil {
id = config.ItemID.String(msg)
}
switch config.Operation {
case OperationCreate:
tb.CreateItem(b, nil)
case OperationDelete:
tb.DeleteItem(id, nil)
case OperationReplace:
tb.ReplaceItem(id, b, nil)
case OperationUpsert:
tb.UpsertItem(b, nil)
case OperationRead:
tb.ReadItem(id, nil)
case OperationPatch:
patch := azcosmos.PatchOperations{}
if config.PatchCondition != nil {
condition, err := config.PatchCondition.TryString(msg)
if err != nil {
return azcosmos.TransactionalBatchResponse{}, fmt.Errorf("failed to get patch condition: %s", err)
}
if condition != "" {
patch.SetCondition(condition)
}
}
for _, po := range config.PatchOperations {
path, err := po.Path.TryString(msg)
if err != nil {
return azcosmos.TransactionalBatchResponse{}, fmt.Errorf("failed to get patch path: %s", err)
}
var value any
if po.Value != nil {
if value, err = batch.BloblangQueryValue(idx, po.Value); err != nil {
return azcosmos.TransactionalBatchResponse{}, fmt.Errorf("failed to evaluate patch value: %s", err)
}
}
switch po.Operation {
case patchOperationAdd:
patch.AppendAdd(path, value)
case patchOperationIncrement:
if v, ok := value.(int64); ok {
patch.AppendIncrement(path, v)
} else {
return azcosmos.TransactionalBatchResponse{}, fmt.Errorf("expected patch value to be int64, got %T", value)
}
case patchOperationRemove:
patch.AppendRemove(path)
case patchOperationReplace:
patch.AppendReplace(path, value)
case patchOperationSet:
patch.AppendSet(path, value)
}
}
tb.PatchItem(id, patch, nil)
}
}
return client.ExecuteTransactionalBatch(ctx, tb, &azcosmos.TransactionalBatchOptions{
EnableContentResponseOnWrite: enableContentResponseOnWrite,
})
}