/
s3_copy.go
344 lines (295 loc) · 11.5 KB
/
s3_copy.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
package command
import (
"context"
"path/filepath"
"strings"
"time"
"github.com/aws/aws-sdk-go/aws/endpoints"
"github.com/aws/aws-sdk-go/service/s3"
"github.com/evergreen-ci/evergreen/agent/internal"
"github.com/evergreen-ci/evergreen/agent/internal/client"
agentutil "github.com/evergreen-ci/evergreen/agent/util"
"github.com/evergreen-ci/evergreen/apimodels"
"github.com/evergreen-ci/evergreen/model/artifact"
"github.com/evergreen-ci/evergreen/util"
"github.com/evergreen-ci/pail"
"github.com/evergreen-ci/utility"
"github.com/mitchellh/mapstructure"
"github.com/mongodb/grip"
"github.com/pkg/errors"
)
const (
pushLogSuccess = "success"
pushLogFailed = "failed"
)
// The S3CopyPlugin consists of zero or more files that are to be copied
// from one location in S3 to the other.
type s3copy struct {
// AwsKey, AwsSecret, and AwsSessionToken are the user's credentials for
// authenticating interactions with s3.
AwsKey string `mapstructure:"aws_key" plugin:"expand"`
AwsSecret string `mapstructure:"aws_secret" plugin:"expand"`
AwsSessionToken string `mapstructure:"aws_session_token" plugin:"expand"`
// An array of file copy configurations
S3CopyFiles []*s3CopyFile `mapstructure:"s3_copy_files" plugin:"expand"`
base
}
type s3CopyFile struct {
// Each source and destination is specified in the
// following manner:
// bucket: <s3 bucket>
// path: <path to file>
//
// e.g.
// bucket: mciuploads
// path: linux-64/x86_64/artifact.tgz
Source s3Loc `mapstructure:"source" plugin:"expand"`
Destination s3Loc `mapstructure:"destination" plugin:"expand"`
// Permissions is the ACL to apply to the copied file. See:
// http://docs.aws.amazon.com/AmazonS3/latest/dev/acl-overview.html#canned-acl
// for some examples.
Permissions string `mapstructure:"permissions" plugin:"expand"`
// BuildVariants is a slice of build variants for which
// a specified file is to be copied. An empty slice indicates it is to be
// copied for all build variants
BuildVariants []string `mapstructure:"build_variants" plugin:"expand"`
//DisplayName is the name of the file
DisplayName string `mapstructure:"display_name" plugin:"expand"`
// Optional, when true suppresses the error state for the file.
Optional bool `mapstructure:"optional"`
}
// s3Loc is a format for describing the location of a file in
// Amazon's S3. It contains an entry for the bucket name and another
// describing the path name of the file within the bucket
type s3Loc struct {
// Region is the s3 region where the bucket is located. It defaults to
// "us-east-1".
Region string `mapstructure:"region" plugin:"region"`
// Bucket is the s3 bucket for the file.
Bucket string `mapstructure:"bucket" plugin:"expand"`
// Path is the file path within the bucket.
Path string `mapstructure:"path" plugin:"expand"`
}
func s3CopyFactory() Command { return &s3copy{} }
func (c *s3copy) Name() string { return "s3Copy.copy" }
// ParseParams decodes the S3 push command parameters
func (c *s3copy) ParseParams(params map[string]interface{}) error {
decoder, err := mapstructure.NewDecoder(&mapstructure.DecoderConfig{
WeaklyTypedInput: true,
Result: c,
})
if err != nil {
return errors.WithStack(err)
}
if err := decoder.Decode(params); err != nil {
return errors.Wrap(err, "decoding mapstructure params")
}
return c.validate()
}
// validate is a helper function that ensures all
// the fields necessary for carrying out an S3 copy operation are present
func (c *s3copy) validate() error {
catcher := grip.NewSimpleCatcher()
// make sure the command params are valid
if c.AwsKey == "" {
catcher.New("AWS key cannot be blank")
}
if c.AwsSecret == "" {
catcher.New("AWS secret cannot be blank")
}
for _, s3CopyFile := range c.S3CopyFiles {
if s3CopyFile.Source.Path == "" {
catcher.New("S3 source path cannot be blank")
}
if s3CopyFile.Destination.Path == "" {
catcher.New("S3 destination path cannot be blank")
}
if s3CopyFile.Permissions == "" {
s3CopyFile.Permissions = s3.BucketCannedACLPublicRead
}
if s3CopyFile.Source.Region == "" {
s3CopyFile.Source.Region = endpoints.UsEast1RegionID
}
if s3CopyFile.Destination.Region == "" {
s3CopyFile.Destination.Region = endpoints.UsEast1RegionID
}
// make sure both buckets are valid
if err := validateS3BucketName(s3CopyFile.Source.Bucket); err != nil {
catcher.Wrapf(err, "source bucket name '%s' is invalid", s3CopyFile.Source.Bucket)
}
if err := validateS3BucketName(s3CopyFile.Destination.Bucket); err != nil {
catcher.Wrapf(err, "destination bucket name '%s' is invalid", s3CopyFile.Destination.Bucket)
}
}
return catcher.Resolve()
}
// Execute expands the parameters, and then copies the
// resource from one s3 bucket to another one.
func (c *s3copy) Execute(ctx context.Context,
comm client.Communicator, logger client.LoggerProducer, conf *internal.TaskConfig) error {
if err := util.ExpandValues(c, &conf.Expansions); err != nil {
return errors.Wrap(err, "applying expansions")
}
// Re-validate the command here, in case an expansion is not defined.
if err := c.validate(); err != nil {
return errors.Wrap(err, "validating params")
}
errChan := make(chan error)
go func() {
err := errors.WithStack(c.copyWithRetry(ctx, comm, logger, conf))
select {
case errChan <- err:
return
case <-ctx.Done():
logger.Task().Infof("Context canceled waiting for s3 copy: %s.", ctx.Err())
return
}
}()
select {
case err := <-errChan:
return err
case <-ctx.Done():
logger.Execution().Infof("Canceled while running command '%s'", c.Name())
return nil
}
}
func (c *s3copy) copyWithRetry(ctx context.Context,
comm client.Communicator, logger client.LoggerProducer, conf *internal.TaskConfig) error {
backoffCounter := getS3OpBackoff()
timer := time.NewTimer(0)
defer timer.Stop()
td := client.TaskData{ID: conf.Task.Id, Secret: conf.Task.Secret}
client := utility.GetHTTPClient()
client.Timeout = 10 * time.Minute
defer utility.PutHTTPClient(client)
for _, s3CopyFile := range c.S3CopyFiles {
logger.Task().WarningWhen(strings.Contains(s3CopyFile.Destination.Bucket, "."), "Destination bucket names containing dots that are created after Sept. 30, 2020 are not guaranteed to have valid attached URLs.")
timer.Reset(0)
if len(s3CopyFile.BuildVariants) > 0 && !utility.StringSliceContains(
s3CopyFile.BuildVariants, conf.BuildVariant.Name) {
continue
}
if err := ctx.Err(); err != nil {
return errors.Wrap(err, "command was cancelled")
}
logger.Execution().Infof("Making API push copy call to "+
"transfer %v/%v => %v/%v", s3CopyFile.Source.Bucket,
s3CopyFile.Source.Path, s3CopyFile.Destination.Bucket,
s3CopyFile.Destination.Path)
s3CopyReq := apimodels.S3CopyRequest{
S3SourceRegion: s3CopyFile.Source.Region,
S3SourceBucket: s3CopyFile.Source.Bucket,
S3SourcePath: s3CopyFile.Source.Path,
S3DestinationRegion: s3CopyFile.Destination.Region,
S3DestinationBucket: s3CopyFile.Destination.Bucket,
S3DestinationPath: s3CopyFile.Destination.Path,
S3DisplayName: s3CopyFile.DisplayName,
S3Permissions: s3CopyFile.Permissions,
}
newPushLog, err := comm.NewPush(ctx, td, &s3CopyReq)
if err != nil {
return errors.Wrap(err, "adding push log")
}
if newPushLog.TaskId == "" {
logger.Task().Infof("noop, this version is currently in the process of trying to push, or has already succeeded in pushing the file: '%s/%s'", s3CopyFile.Destination.Bucket, s3CopyFile.Destination.Path)
continue
}
srcOpts := pail.S3Options{
Credentials: pail.CreateAWSCredentials(c.AwsKey, c.AwsSecret, c.AwsSessionToken),
Region: s3CopyReq.S3SourceRegion,
Name: s3CopyReq.S3SourceBucket,
Permissions: pail.S3Permissions(s3CopyReq.S3Permissions),
}
srcBucket, err := pail.NewS3MultiPartBucketWithHTTPClient(client, srcOpts)
if err != nil {
catcher := grip.NewBasicCatcher()
catcher.Wrap(err, "initializing S3 source bucket")
newPushLog.Status = pushLogFailed
catcher.Wrap(comm.UpdatePushStatus(ctx, td, newPushLog), "updating push log to failed")
return catcher.Resolve()
}
if err := srcBucket.Check(ctx); err != nil {
catcher := grip.NewBasicCatcher()
catcher.Wrap(err, "checking bucket")
newPushLog.Status = pushLogFailed
catcher.Wrap(comm.UpdatePushStatus(ctx, td, newPushLog), "updating push log to failed")
return catcher.Resolve()
}
destOpts := pail.S3Options{
Credentials: pail.CreateAWSCredentials(c.AwsKey, c.AwsSecret, c.AwsSessionToken),
Region: s3CopyReq.S3DestinationRegion,
Name: s3CopyReq.S3DestinationBucket,
Permissions: pail.S3Permissions(s3CopyReq.S3Permissions),
}
destBucket, err := pail.NewS3MultiPartBucket(destOpts)
if err != nil {
catcher := grip.NewBasicCatcher()
catcher.Wrap(err, "initializing S3 destination bucket")
newPushLog.Status = pushLogFailed
catcher.Wrap(comm.UpdatePushStatus(ctx, td, newPushLog), "updating push log to failed")
return catcher.Resolve()
}
retryLoop:
for i := 0; i < maxS3OpAttempts; i++ {
select {
case <-ctx.Done():
return errors.Errorf("command '%s' canceled", c.Name())
case <-timer.C:
copyOpts := pail.CopyOptions{
SourceKey: s3CopyReq.S3SourcePath,
DestinationKey: s3CopyReq.S3DestinationPath,
DestinationBucket: destBucket,
}
err = srcBucket.Copy(ctx, copyOpts)
if err != nil {
newPushLog.Status = pushLogFailed
if err := comm.UpdatePushStatus(ctx, td, newPushLog); err != nil {
return errors.Wrap(err, "updating push log status failed for task")
}
if s3CopyFile.Optional {
logger.Execution().Error(err)
logger.Execution().Errorf("S3 push copy failed to copy '%s' to '%s' and file is optional, continuing.",
s3CopyFile.Source.Path, s3CopyFile.Destination.Bucket)
timer.Reset(backoffCounter.Duration())
continue retryLoop
} else {
logger.Execution().Errorf("S3 push copy failed to copy '%s' to '%s' and file is not optional, exiting.", s3CopyFile.Source.Path, s3CopyFile.Destination.Bucket)
return errors.Wrapf(err, "S3 push copy failed to copy '%s' to '%s'", s3CopyFile.Source.Path, s3CopyFile.Destination.Bucket)
}
} else {
newPushLog.Status = pushLogSuccess
if err := comm.UpdatePushStatus(ctx, td, newPushLog); err != nil {
return errors.Wrap(err, "updating push log status to success for task")
}
if err = c.attachFiles(ctx, comm, logger, td, s3CopyReq); err != nil {
return errors.Wrap(err, "attaching files")
}
break retryLoop
}
}
}
logger.Task().Infof("Successfully copied source file '%s' to destination path '%s'.", s3CopyFile.Source.Path, s3CopyFile.Destination.Path)
}
return nil
}
// attachFiles is responsible for sending the specified file to the API Server.
func (c *s3copy) attachFiles(ctx context.Context, comm client.Communicator,
logger client.LoggerProducer, td client.TaskData, request apimodels.S3CopyRequest) error {
remotePath := filepath.ToSlash(request.S3DestinationPath)
fileLink := agentutil.S3DefaultURL(request.S3DestinationBucket, remotePath)
displayName := request.S3DisplayName
if displayName == "" {
displayName = filepath.Base(request.S3SourcePath)
}
logger.Execution().Infof("Attaching file '%s'.", displayName)
file := artifact.File{
Name: displayName,
Link: fileLink,
}
files := []*artifact.File{&file}
if err := comm.AttachFiles(ctx, td, files); err != nil {
return errors.Wrapf(err, "attaching file '%s'", displayName)
}
logger.Execution().Infof("Successfully attached file '%s'.", displayName)
return nil
}