This repository has been archived by the owner on Jan 10, 2023. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 50
/
s3.go
156 lines (131 loc) · 3.96 KB
/
s3.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
package uploader
import (
"errors"
"os"
"path"
"io"
"context"
"github.com/Netflix/metrics-client-go/metrics"
"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/aws/ec2metadata"
"github.com/aws/aws-sdk-go/aws/session"
"github.com/aws/aws-sdk-go/service/s3"
"github.com/aws/aws-sdk-go/service/s3/s3manager"
"github.com/sirupsen/logrus"
)
const (
defaultS3ContentType = "text/plain"
defaultS3ACL = s3.ObjectCannedACLBucketOwnerFullControl
defaultS3PartSize = 64 * 1024 * 1024 // 64MB per part
)
// CountingReader is a wrapper of io.Reader to count number of bytes read
type CountingReader struct {
reader io.Reader
BytesRead int
}
// Read aggregates number of bytes read
func (r * CountingReader) Read(p []byte) (n int, err error) {
n, err = r.reader.Read(p)
if err == nil {
r.BytesRead += n
}
return
}
// S3Uploader uploads logs to S3
type S3Uploader struct {
log logrus.FieldLogger
bucketName string
s3Uploader *s3manager.Uploader
metrics metrics.Reporter
}
// NewS3Uploader creates a new instance of an S3 uploader
func NewS3Uploader(m metrics.Reporter, log logrus.FieldLogger, bucket string) Uploader {
region, err := getEC2Region()
if err != nil {
panic(err)
}
u := &S3Uploader{
log: log,
bucketName: bucket,
metrics: m,
}
session, err := session.NewSession(&aws.Config{
Logger: &logAdapter{log},
Region: ®ion,
})
if err != nil {
panic(err)
}
u.s3Uploader = s3manager.NewUploader(session, func(u *s3manager.Uploader) {
u.PartSize = defaultS3PartSize
})
return u
}
func getEC2Region() (string, error) {
if region := os.Getenv("EC2_REGION"); region != "" {
return region, nil
}
sess := session.Must(session.NewSession())
ec2metadatasvc := ec2metadata.New(sess)
if !ec2metadatasvc.Available() {
return "", errors.New("Unable to determine EC2 Region, and EC2 metadata service unavailable")
}
return ec2metadatasvc.Region()
}
// Upload writes a single file only to S3!
func (u *S3Uploader) Upload(ctx context.Context, local string, remote string, ctypeFunc ContentTypeInferenceFunction) error {
u.log.Printf("Attempting to upload file from: %s to: %s", local, path.Join(u.bucketName, remote))
f, err := os.Open(local)
if err != nil {
return err
}
contentType := ctypeFunc(local)
if contentType == "" {
contentType = defaultS3ContentType
}
defer func() {
if err = f.Close(); err != nil {
u.log.Printf("Failed to close %s: %s", f.Name(), err)
}
}()
return u.uploadFile(ctx, f, remote, contentType)
}
// UploadFile writes a single file only to S3!
func (u *S3Uploader) uploadFile(ctx context.Context, local io.Reader, remote string, contentType string) error {
u.log.Printf("Attempting to upload file from: %s to: %s", local, path.Join(u.bucketName, remote))
if contentType == "" {
contentType = defaultS3ContentType
}
// wrap input io.Reader with a counting reader
reader := &CountingReader{local, 0}
result, err := u.s3Uploader.UploadWithContext(ctx, &s3manager.UploadInput{
ACL: aws.String(defaultS3ACL),
ContentType: aws.String(contentType),
Bucket: aws.String(u.bucketName),
Key: aws.String(remote),
Body: io.Reader(reader),
})
if err != nil {
return err
}
u.metrics.Counter("titus.executor.S3Uploader.uploadMB", int(reader.BytesRead/1024/1024), nil)
u.log.Printf("Successfully uploaded file from: %s to: %s", local, result.Location)
return nil
}
// UploadPartOfFile copies a single file only. It doesn't preserve the cursor location in the file.
func (u *S3Uploader) UploadPartOfFile(ctx context.Context, local io.ReadSeeker, start, length int64, remote, contentType string) error {
if _, err := local.Seek(start, io.SeekStart); err != nil {
return err
}
if contentType == "" {
contentType = defaultS3ContentType
}
limitLocal := io.LimitReader(local, length)
return u.uploadFile(ctx, limitLocal, remote, contentType)
}
type logAdapter struct {
log logrus.StdLogger
}
func (a *logAdapter) Log(args ...interface{}) {
a.log.Print(args...)
}