forked from argoproj/argo-workflows
-
Notifications
You must be signed in to change notification settings - Fork 0
/
s3.go
103 lines (94 loc) · 3.13 KB
/
s3.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
package s3
import (
"time"
log "github.com/sirupsen/logrus"
"k8s.io/apimachinery/pkg/util/wait"
wfv1 "github.com/argoproj/argo/pkg/apis/workflow/v1alpha1"
"github.com/argoproj/pkg/file"
argos3 "github.com/argoproj/pkg/s3"
)
// S3ArtifactDriver is a driver for AWS S3
type S3ArtifactDriver struct {
Endpoint string
Region string
Secure bool
AccessKey string
SecretKey string
}
// newMinioClient instantiates a new minio client object.
func (s3Driver *S3ArtifactDriver) newS3Client() (argos3.S3Client, error) {
opts := argos3.S3ClientOpts{
Endpoint: s3Driver.Endpoint,
Region: s3Driver.Region,
Secure: s3Driver.Secure,
AccessKey: s3Driver.AccessKey,
SecretKey: s3Driver.SecretKey,
}
return argos3.NewS3Client(opts)
}
// Load downloads artifacts from S3 compliant storage
func (s3Driver *S3ArtifactDriver) Load(inputArtifact *wfv1.Artifact, path string) error {
err := wait.ExponentialBackoff(wait.Backoff{Duration: time.Second * 2, Factor: 2.0, Steps: 5, Jitter: 0.1},
func() (bool, error) {
log.Infof("S3 Load path: %s, key: %s", path, inputArtifact.S3.Key)
s3cli, err := s3Driver.newS3Client()
if err != nil {
log.Warnf("Failed to create new S3 client: %v", err)
return false, nil
}
origErr := s3cli.GetFile(inputArtifact.S3.Bucket, inputArtifact.S3.Key, path)
if origErr == nil {
return true, nil
}
if !argos3.IsS3ErrCode(origErr, "NoSuchKey") {
log.Warnf("Failed get file: %v", origErr)
return false, nil
}
// If we get here, the error was a NoSuchKey. The key might be a s3 "directory"
isDir, err := s3cli.IsDirectory(inputArtifact.S3.Bucket, inputArtifact.S3.Key)
if err != nil {
log.Warnf("Failed to test if %s is a directory: %v", inputArtifact.S3.Bucket, err)
return false, nil
}
if !isDir {
// It's neither a file, nor a directory. Return the original NoSuchKey error
return false, origErr
}
if err = s3cli.GetDirectory(inputArtifact.S3.Bucket, inputArtifact.S3.Key, path); err != nil {
log.Warnf("Failed get directory: %v", err)
return false, nil
}
return true, nil
})
return err
}
// Save saves an artifact to S3 compliant storage
func (s3Driver *S3ArtifactDriver) Save(path string, outputArtifact *wfv1.Artifact) error {
err := wait.ExponentialBackoff(wait.Backoff{Duration: time.Second * 2, Factor: 2.0, Steps: 5, Jitter: 0.1},
func() (bool, error) {
log.Infof("S3 Save path: %s, key: %s", path, outputArtifact.S3.Key)
s3cli, err := s3Driver.newS3Client()
if err != nil {
log.Warnf("Failed to create new S3 client: %v", err)
return false, nil
}
isDir, err := file.IsDirectory(path)
if err != nil {
log.Warnf("Failed to test if %s is a directory: %v", path, err)
return false, nil
}
if isDir {
if err = s3cli.PutDirectory(outputArtifact.S3.Bucket, outputArtifact.S3.Key, path); err != nil {
log.Warnf("Failed to put directory: %v", err)
return false, nil
}
} else {
if err = s3cli.PutFile(outputArtifact.S3.Bucket, outputArtifact.S3.Key, path); err != nil {
log.Warnf("Failed to put file: %v", err)
return false, nil
}
}
return true, nil
})
return err
}