/
azure_table_storage.go
216 lines (196 loc) · 6.56 KB
/
azure_table_storage.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
//go:build !wasm
// +build !wasm
package writer
import (
"context"
"encoding/json"
"errors"
"fmt"
"strings"
"time"
"github.com/Azure/azure-sdk-for-go/storage"
"github.com/Jeffail/benthos/v3/internal/bloblang/field"
"github.com/Jeffail/benthos/v3/internal/interop"
"github.com/Jeffail/benthos/v3/lib/log"
"github.com/Jeffail/benthos/v3/lib/metrics"
"github.com/Jeffail/benthos/v3/lib/types"
)
//------------------------------------------------------------------------------
// AzureTableStorage is a benthos writer. Type implementation that writes messages to an
// Azure Table Storage table.
type AzureTableStorage struct {
conf AzureTableStorageConfig
tableName *field.Expression
partitionKey *field.Expression
rowKey *field.Expression
properties map[string]*field.Expression
client storage.TableServiceClient
timeout time.Duration
log log.Modular
stats metrics.Type
}
// NewAzureTableStorage creates a new Azure Table Storage writer Type.
//
// Deprecated: use the V2 API instead.
func NewAzureTableStorage(
conf AzureTableStorageConfig,
log log.Modular,
stats metrics.Type,
) (*AzureTableStorage, error) {
return NewAzureTableStorageV2(conf, types.NoopMgr(), log, stats)
}
// NewAzureTableStorageV2 creates a new Azure Table Storage writer Type.
func NewAzureTableStorageV2(
conf AzureTableStorageConfig,
mgr types.Manager,
log log.Modular,
stats metrics.Type,
) (*AzureTableStorage, error) {
var timeout time.Duration
var err error
if tout := conf.Timeout; len(tout) > 0 {
if timeout, err = time.ParseDuration(tout); err != nil {
return nil, fmt.Errorf("failed to parse timeout period string: %v", err)
}
}
if conf.StorageAccount == "" && conf.StorageConnectionString == "" {
return nil, errors.New("invalid azure storage account credentials")
}
var client storage.Client
if conf.StorageConnectionString != "" {
if strings.Contains(conf.StorageConnectionString, "UseDevelopmentStorage=true;") {
client, err = storage.NewEmulatorClient()
} else {
client, err = storage.NewClientFromConnectionString(conf.StorageConnectionString)
}
} else {
client, err = storage.NewBasicClient(conf.StorageAccount, conf.StorageAccessKey)
}
if err != nil {
return nil, fmt.Errorf("invalid azure storage account credentials: %v", err)
}
a := &AzureTableStorage{
conf: conf,
log: log,
stats: stats,
timeout: timeout,
client: client.GetTableService(),
}
if a.tableName, err = interop.NewBloblangField(mgr, conf.TableName); err != nil {
return nil, fmt.Errorf("failed to parse table name expression: %v", err)
}
if a.partitionKey, err = interop.NewBloblangField(mgr, conf.PartitionKey); err != nil {
return nil, fmt.Errorf("failed to parse partition key expression: %v", err)
}
if a.rowKey, err = interop.NewBloblangField(mgr, conf.RowKey); err != nil {
return nil, fmt.Errorf("failed to parse row key expression: %v", err)
}
a.properties = make(map[string]*field.Expression)
for property, value := range conf.Properties {
if a.properties[property], err = interop.NewBloblangField(mgr, value); err != nil {
return nil, fmt.Errorf("failed to parse property expression: %v", err)
}
}
return a, nil
}
// ConnectWithContext attempts to establish a connection to the target Table Storage Account.
func (a *AzureTableStorage) ConnectWithContext(ctx context.Context) error {
return a.Connect()
}
// Connect attempts to establish a connection to the target Table Storage Account.
func (a *AzureTableStorage) Connect() error {
return nil
}
// Write attempts to write message contents to a target Azure Table Storage container as files.
func (a *AzureTableStorage) Write(msg types.Message) error {
return a.WriteWithContext(context.Background(), msg)
}
// WriteWithContext attempts to write message contents to a target storage account as files.
func (a *AzureTableStorage) WriteWithContext(wctx context.Context, msg types.Message) error {
writeReqs := make(map[string]map[string][]*storage.Entity)
if err := IterateBatchedSend(msg, func(i int, p types.Part) error {
entity := &storage.Entity{}
tableName := a.tableName.String(i, msg)
partitionKey := a.partitionKey.String(i, msg)
entity.PartitionKey = a.partitionKey.String(i, msg)
entity.RowKey = a.rowKey.String(i, msg)
jsonMap := make(map[string]interface{})
if len(a.properties) == 0 {
err := json.Unmarshal(p.Get(), &jsonMap)
if err != nil {
a.log.Errorf("error unmarshalling message: %v.", err)
}
for property, v := range jsonMap {
switch v.(type) {
case []interface{}, map[string]interface{}:
m, err := json.Marshal(v)
if err != nil {
a.log.Errorf("error marshalling property: %v.", property)
}
jsonMap[property] = string(m)
}
}
} else {
for property, value := range a.properties {
jsonMap[property] = value.String(i, msg)
}
}
entity.Properties = jsonMap
if writeReqs[tableName] == nil {
writeReqs[tableName] = make(map[string][]*storage.Entity)
}
writeReqs[tableName][partitionKey] = append(writeReqs[tableName][partitionKey], entity)
return nil
}); err != nil {
return err
}
for tn, pks := range writeReqs {
table := a.client.GetTableReference(tn)
for _, entities := range pks {
tableBatch := table.NewBatch()
for _, entity := range entities {
entity.Table = table
if err := a.createBatch(tableBatch, a.conf.InsertType, entity); err != nil {
return err
}
}
if err := tableBatch.ExecuteBatch(); err != nil {
if cerr, ok := err.(storage.AzureStorageServiceError); ok {
if cerr.Code == "TableNotFound" {
if cerr := table.Create(uint(10), storage.FullMetadata, nil); cerr != nil {
a.log.Errorf("error creating table: %v.", cerr)
}
// retry
err = tableBatch.ExecuteBatch()
}
}
if err != nil {
return err
}
}
}
}
return nil
}
func (a *AzureTableStorage) createBatch(tableBatch *storage.TableBatch, insertType string, entity *storage.Entity) error {
switch insertType {
case "INSERT":
tableBatch.InsertEntity(entity)
case "INSERT_MERGE":
tableBatch.InsertOrMergeEntity(entity, true)
case "INSERT_REPLACE":
tableBatch.InsertOrReplaceEntity(entity, true)
default:
return fmt.Errorf("invalid insert type")
}
return nil
}
// CloseAsync begins cleaning up resources used by this reader asynchronously.
func (a *AzureTableStorage) CloseAsync() {
}
// WaitForClose will block until either the reader is closed or a specified
// timeout occurs.
func (a *AzureTableStorage) WaitForClose(time.Duration) error {
return nil
}
//------------------------------------------------------------------------------