/
s3.go
139 lines (118 loc) · 3.03 KB
/
s3.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
// Writes your blobs to AWS S3
package s3blobstore
import (
"bytes"
"context"
"encoding/base64"
"errors"
"fmt"
"io"
"io/ioutil"
"log"
"os"
"strings"
"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/aws/awserr"
"github.com/aws/aws-sdk-go/aws/credentials"
"github.com/aws/aws-sdk-go/service/s3"
"github.com/function61/gokit/aws/s3facade"
"github.com/function61/gokit/logex"
"github.com/function61/varasto/pkg/stotypes"
)
type s3blobstore struct {
blobNamer *s3BlobNamer
bucket *s3facade.BucketContext
logl *logex.Leveled
}
func New(opts string, logger *log.Logger) (*s3blobstore, error) {
conf, err := deserializeConfig(opts)
if err != nil {
return nil, err
}
if !strings.HasSuffix(conf.Prefix, "/") {
return nil, fmt.Errorf("prefix needs to end in '/'; got '%s'", conf.Prefix)
}
staticCreds := credentials.NewStaticCredentials(
conf.AccessKeyId,
conf.AccessKeySecret,
"")
bucket, err := s3facade.Bucket(
conf.Bucket,
s3facade.Credentials(staticCreds),
conf.RegionId)
if err != nil {
return nil, err
}
return &s3blobstore{
blobNamer: &s3BlobNamer{conf.Prefix},
bucket: bucket,
logl: logex.Levels(logger),
}, nil
}
func (g *s3blobstore) RawFetch(ctx context.Context, ref stotypes.BlobRef) (io.ReadCloser, error) {
res, err := g.bucket.S3.GetObjectWithContext(ctx, &s3.GetObjectInput{
Bucket: g.bucket.Name,
Key: g.blobNamer.Ref(ref),
})
if err != nil {
if err, ok := err.(awserr.Error); ok && err.Code() == s3.ErrCodeNoSuchKey {
return nil, os.ErrNotExist
}
return nil, fmt.Errorf("s3 GetObject: %v", err)
}
return res.Body, nil
}
func (g *s3blobstore) RawStore(ctx context.Context, ref stotypes.BlobRef, content io.Reader) error {
// since S3 internally requires retry support, it requires a io.ReadSeeker and thus
// we're forced to buffer
buf, err := ioutil.ReadAll(content)
if err != nil {
return err
}
if _, err := g.bucket.S3.PutObjectWithContext(ctx, &s3.PutObjectInput{
Bucket: g.bucket.Name,
Key: g.blobNamer.Ref(ref),
Body: bytes.NewReader(buf),
}); err != nil {
return fmt.Errorf("s3 PutObject: %v", err)
}
return nil
}
func (s *s3blobstore) RoutingCost() int {
return 20
}
type s3BlobNamer struct {
prefix string
}
func (s *s3BlobNamer) Ref(ref stotypes.BlobRef) *string {
return aws.String(s.prefix + base64.RawURLEncoding.EncodeToString([]byte(ref)))
}
type Config struct {
Bucket string
Prefix string
RegionId string
AccessKeyId string
AccessKeySecret string
}
func (c *Config) Serialize() string {
return strings.Join([]string{
c.Bucket,
c.Prefix,
c.AccessKeyId,
c.AccessKeySecret,
c.RegionId,
}, ":")
}
func deserializeConfig(serialized string) (*Config, error) {
match := strings.Split(serialized, ":")
if len(match) != 5 {
return nil, errors.New("s3 options not in format bucket:prefix:accessKeyId:secret:region")
}
return &Config{
Bucket: match[0],
Prefix: match[1],
AccessKeyId: match[2],
AccessKeySecret: match[3],
RegionId: match[4],
}, nil
}