/
copy_object.go
153 lines (128 loc) · 4.57 KB
/
copy_object.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
package objutil
import (
"context"
"fmt"
"math"
"github.com/couchbase/tools-common/cloud/v3/objstore/objcli"
"github.com/couchbase/tools-common/cloud/v3/objstore/objval"
"github.com/couchbase/tools-common/types/ptr"
)
// CopyObjectOptions encapsulates the available options which can be used when copying an object.
type CopyObjectOptions struct {
Options
// Client is the client used to perform the operation.
//
// NOTE: This attribute is required.
Client objcli.Client
// DestinationBucket is the bucket which the copied object will be placed in.
//
// NOTE: This attribute is required.
DestinationBucket string
// DestinationKey is the key which will be used for the copied object.
//
// NOTE: This attribute is required.
DestinationKey string
// SourceBucket is the bucket in which the object being copied resides in.
//
// NOTE: This attribute is required.
SourceBucket string
// SourceKey is the key of the source object.
//
// NOTE: This attribute is required.
SourceKey string
}
// CopyObject copies an object from one place to another breaking the request into multiple parts where it's known that
// cloud provider limits will be hit.
//
// NOTE: Client must have permissions to both the source/destination buckets.
func CopyObject(opts CopyObjectOptions) error {
// Fill out any missing fields with the sane defaults
opts.defaults()
attrs, err := opts.Client.GetObjectAttrs(context.Background(), objcli.GetObjectAttrsOptions{
Bucket: opts.SourceBucket,
Key: opts.SourceKey,
})
if err != nil {
return fmt.Errorf("failed to get object attributes: %w", err)
}
var (
size = ptr.From(attrs.Size)
max = maxSingleOperationCopySize(opts.Client.Provider())
)
// If we're able to perform this operation with a single request, do that instead.
if size <= max {
copts := objcli.CopyObjectOptions{
DestinationBucket: opts.DestinationBucket,
DestinationKey: opts.DestinationKey,
SourceBucket: opts.SourceBucket,
SourceKey: opts.SourceKey,
}
return opts.Client.CopyObject(opts.Context, copts)
}
id, err := opts.Client.CreateMultipartUpload(context.Background(), objcli.CreateMultipartUploadOptions{
Bucket: opts.DestinationBucket,
Key: opts.DestinationKey,
})
if err != nil {
return fmt.Errorf("failed to start multipart upload: %w", err)
}
aopts := objcli.AbortMultipartUploadOptions{
Bucket: opts.DestinationBucket,
UploadID: id,
Key: opts.DestinationKey,
}
defer opts.Client.AbortMultipartUpload(opts.Context, aopts) //nolint:errcheck
var parts []objval.Part
// cp transfers the given range from the object into the multipart upload.
//
// NOTE: We currently perform this operation sequentially, so we don't need to guard access to the 'parts'. There is
// room for improvement to do this concurrently though, so that must be considered in the future.
cp := func(start, end int64) error {
part, err := opts.Client.UploadPartCopy(context.Background(), objcli.UploadPartCopyOptions{
DestinationBucket: opts.DestinationBucket,
UploadID: id,
DestinationKey: opts.DestinationKey,
SourceBucket: opts.SourceBucket,
SourceKey: opts.SourceKey,
Number: len(parts) + 1,
ByteRange: &objval.ByteRange{Start: start, End: end},
})
if err != nil {
return fmt.Errorf("failed to copy part: %w", err)
}
parts = append(parts, part)
return nil
}
// Break the object down into chunks, and perform copy operations for each
err = chunk(size, opts.PartSize, cp)
if err != nil {
return fmt.Errorf("failed to copy parts: %w", err)
}
err = opts.Client.CompleteMultipartUpload(context.Background(), objcli.CompleteMultipartUploadOptions{
Bucket: opts.DestinationBucket,
UploadID: id,
Key: opts.DestinationKey,
Parts: parts,
})
if err != nil {
return fmt.Errorf("failed to complete multipart upload: %w", err)
}
return nil
}
// maxSingleOperationCopySize returns an integer representing the point at which copying must be broken down into a
// multipart upload; this is required as some cloud providers have limits on copying objects.
//
// NOTE: If the provider is unknown, a zero value is returned which will trigger multipart uploads which should always
// be valid if only slightly sub-optimal.
func maxSingleOperationCopySize(provider objval.Provider) int64 {
switch provider {
case objval.ProviderAWS:
return 5 * 1000 * 1000 * 1000
case objval.ProviderAzure:
return 256 * 1000 * 1000
case objval.ProviderGCP:
// Don't trigger the multipart copy behavior for GCP; that's already handled by the SDK.
return math.MaxInt64
}
return 0
}