forked from gwatts/dyndump
/
dyndump.go
313 lines (265 loc) · 7.49 KB
/
dyndump.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
// Copyright 2015 Gareth Watts
// Licensed under an MIT license
// See the LICENSE file for details
/*
Package dyndump provides a scan-based method for dumping an entire DynamoDB
table using scan.
It supports parallel connections to the server for increased throughput along
with rate limiting to a specific read capacity.
Items are written to an ItemWriter interface until the table is exhausted,
or the Stop method is called.
*/
package dyndump
import (
"fmt"
"math"
"time"
"sort"
"sync"
"sync/atomic"
"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/service/dynamodb"
"github.com/juju/ratelimit"
)
var (
limitCalcSize = 50 // number of item sizes to collect when calculating an average
)
// Stats is returned by Fetcher.Stats to return current global throughput statistics.
type Stats struct {
ItemsRead int64
CapacityUsed float64
}
// Fetcher fetches data from DynamoDB at a specified capacity and writes
// fetched items to a writer implementing the ItemWriter interface.
type Fetcher struct {
Dyn *dynamodb.DynamoDB
TableName string
ConsistentRead bool // Setting to true will use double the read capacity.
MaxParallel int // Maximum number of parallel requests to make to Dynamo.
MaxItems int64 // Maximum (approximately) number of items to read from Dynamo.
ReadCapacity float64 // Average global read capacity to use for the scan.
Writer ItemWriter // Retrieved items are sent to this ItemWriter.
rateLimit *ratelimit.Bucket
itemsRead int64
capacityUsed int64 // multiplied by 10
stopRequest chan struct{}
stopNotify chan struct{}
limitCalc *limitCalc
}
// Run executes the fetcher, starting as many parallel reads as specified by
// the MaxParallel option and returns when the read has finished, failed, or
// been stopped.
func (f *Fetcher) Run() error {
errChan := make(chan error, f.MaxParallel)
f.stopRequest = make(chan struct{}, 2)
f.stopNotify = make(chan struct{})
f.limitCalc = newLimitCalc(limitCalcSize)
if f.ReadCapacity > 0 {
f.rateLimit = ratelimit.NewBucketWithQuantum(time.Second, int64(f.ReadCapacity), int64(f.ReadCapacity))
}
go func() {
<-f.stopRequest
close(f.stopNotify) // fanout
}()
for i := int64(0); i < int64(f.MaxParallel); i++ {
go f.processSegment(i, errChan)
}
var err error
// wait for all workers to shutdown
for i := 0; i < f.MaxParallel; i++ {
if werr := <-errChan; werr != nil {
if err == nil {
err = werr
f.stopRequest <- struct{}{}
}
}
}
return err
}
// Stop requests a clean shutdown of active readers.
// Active readers will complete the current request and then exit.
func (f *Fetcher) Stop() {
f.stopRequest <- struct{}{}
}
// Stats returns current aggregate statistics about an ongoing or completed run.
// It is safe to call from concurrent goroutines.
func (f *Fetcher) Stats() Stats {
return Stats{
ItemsRead: atomic.LoadInt64(&f.itemsRead),
CapacityUsed: float64(atomic.LoadInt64(&f.capacityUsed)) / 10,
}
}
func (f *Fetcher) isStopped() bool {
select {
case <-f.stopNotify:
return true
default:
return false
}
}
// Interruptible rate limit wait
// Returns true if Stop() was called while waiting.
func (f *Fetcher) waitForRateLimit(usedCapacity int64) bool {
d := f.rateLimit.Take(usedCapacity)
if d > 0 {
select {
case <-time.After(d):
return false
case <-f.stopNotify:
return true
}
}
return false
}
// process a single segment. executed in a separate goroutine by Run
// for parallel scans.
func (f *Fetcher) processSegment(segNum int64, doneChan chan<- error) {
limit := aws.Int64(20) // slow start
if f.rateLimit == nil {
limit = aws.Int64(0) // unlimit
}
params := &dynamodb.ScanInput{
TableName: aws.String(f.TableName),
ConsistentRead: aws.Bool(f.ConsistentRead),
Limit: limit,
Segment: aws.Int64(segNum),
TotalSegments: aws.Int64(int64(f.MaxParallel)),
ReturnConsumedCapacity: aws.String("TOTAL"),
}
usedCapacity := int64(1)
for {
if f.rateLimit != nil {
if isStopped := f.waitForRateLimit(usedCapacity); isStopped {
break
}
}
if f.isStopped() {
break
}
// the dynamo service will automatically retry soft errors (including hitting capacity limits)
// with a backoff algorithm any other errors returned are hard errors
resp, err := f.Dyn.Scan(params)
if err != nil {
doneChan <- fmt.Errorf("read from DynamoDB failed: %s", err)
return
}
respSize := 0
for _, item := range resp.Items {
if err := f.Writer.WriteItem(item); err != nil {
doneChan <- fmt.Errorf("write failed: %s", err)
return
}
itemSize := calcItemSize(item)
respSize += itemSize
f.limitCalc.addSize(itemSize)
}
atomic.AddInt64(&f.itemsRead, int64(len(resp.Items)))
atomic.AddInt64(&f.capacityUsed, int64(*resp.ConsumedCapacity.CapacityUnits*10))
if f.MaxItems > 0 && atomic.LoadInt64(&f.itemsRead) >= f.MaxItems {
break
}
if resp.LastEvaluatedKey == nil {
// all data scanned
break
}
usedCapacity = int64(math.Ceil(*resp.ConsumedCapacity.CapacityUnits))
params.ExclusiveStartKey = resp.LastEvaluatedKey
if f.rateLimit != nil {
if newLimit := f.calcLimit(); newLimit > 0 {
params.Limit = aws.Int64(int64(newLimit))
}
}
}
doneChan <- nil
}
// adjust the fetch limit amount to approximate the desired read capacity and
// make effective use of 4k blocks for small items
func (f *Fetcher) calcLimit() (newLimit int) {
desiredCapacity := f.ReadCapacity / float64(f.MaxParallel)
// find the median item size based on recent history
medianSize := f.limitCalc.median()
if medianSize <= 0 {
return -1 // not enough data
}
itemsPer4k := float64(4096) / float64(medianSize)
newLimit = int(itemsPer4k * desiredCapacity)
if !f.ConsistentRead {
newLimit *= 2
}
if newLimit < 1 {
newLimit = 1
}
return newLimit
}
// this is based on https://docs.aws.amazon.com/amazondynamodb/latest/developerguide/WorkingWithTables.html#ItemSizeCalculations
func calcItemSize(item map[string]*dynamodb.AttributeValue) (size int) {
for k, av := range item {
size += len(k)
size += calcAttrSize(av)
}
return size
}
func calcAttrSize(av *dynamodb.AttributeValue) (size int) {
switch {
case av.B != nil: // binary
size += len(av.B)
case av.BOOL != nil: // Bool
size++
case av.BS != nil: // binary set
size += 3
for _, v := range av.BS {
size += len(v)
}
case av.L != nil: // list of attributes
size += 3
for _, v := range av.L {
size += calcAttrSize(v)
}
case av.M != nil: // map of attributes
size += 3
for k, v := range av.M {
size += len(k) + calcAttrSize(v)
}
case av.N != nil: // number
size += len(*av.N)
case av.NS != nil: // number set
size += 3
for _, v := range av.NS {
size += len(*v)
}
case av.NULL != nil: // null
size++
case av.S != nil: // string
size += len(*av.S)
case av.SS != nil: // string set
size += 3
for _, v := range av.SS {
size += len(*v)
}
}
return size
}
// track recent sizes of items
type limitCalc struct {
m sync.Mutex
itemSizes []int
offset int64
}
func newLimitCalc(size int) *limitCalc {
return &limitCalc{itemSizes: make([]int, size)}
}
func (lc *limitCalc) addSize(size int) {
lc.m.Lock()
defer lc.m.Unlock()
lc.itemSizes[lc.offset%int64(len(lc.itemSizes))] = size
lc.offset++
}
func (lc *limitCalc) median() int {
lc.m.Lock()
defer lc.m.Unlock()
if lc.offset < int64(len(lc.itemSizes)) {
return -1
}
sort.Ints(lc.itemSizes)
return lc.itemSizes[len(lc.itemSizes)/2] // close enough to median
}