/
batchupsert.go
84 lines (71 loc) · 1.99 KB
/
batchupsert.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
package dygo
import (
"context"
"time"
"github.com/aws/aws-sdk-go-v2/service/dynamodb"
"github.com/aws/aws-sdk-go-v2/service/dynamodb/types"
"golang.org/x/sync/errgroup"
)
const opBatchUpsert = "BatchUpsert"
const maxRetriesUpsert = 3
// BatchUpsertItem performs batch upsert operations on items.
// It takes a context and the number of threads to use for parallel processing.
// It returns an error if any of the batch operations fail.
//
// Example :
//
// newItem := new(Item)
// for i := 0; i < 5; i++ {
// d := dataItem{
// PK: gId,
// SK: "current",
// PhysicalName: "physical_name_1145",
// LogicalName: "logical_name_1145",
// EntityType: "room",
// }
// db.Item(d).AddBatchUpsertItem(newItem)
// }
// err = newItem.BatchUpsertItem(context.Background(), 10)
// if err != nil {
// log.Fatal(err)
// }
func (i *Item) BatchUpsertItem(ctx context.Context, threadCount int) error {
if i.err != nil {
return i.err
}
g, ctx := errgroup.WithContext(ctx)
g.SetLimit(threadCount)
for _, batch := range i.batchData.batchPut {
batch := batch
g.Go(func() error {
return i.processBatchUpsert(ctx, batch)
})
}
// Wait for all batch operations to complete
if err := g.Wait(); err != nil {
return dynamoError().method(opBatchUpsert).message(err.Error())
}
return nil
}
// processBatchUpsert processes a batch of write requests and performs batch upsert operation in DynamoDB.
func (i *Item) processBatchUpsert(ctx context.Context, batch map[string][]types.WriteRequest) error {
var retries int
for {
input := &dynamodb.BatchWriteItemInput{
RequestItems: batch,
}
result, err := i.c.client.BatchWriteItem(ctx, input)
if err != nil {
return dynamoError().method(opBatchUpsert).message(err.Error())
}
if len(result.UnprocessedItems) == 0 || retries >= maxRetriesUpsert {
break
}
// Retry unprocessed items
batch = result.UnprocessedItems
retries++
}
// Sleep for 1 second to avoid throttling
time.Sleep(time.Second * 1)
return nil
}