/
S3Manager.go
137 lines (110 loc) · 3.71 KB
/
S3Manager.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
package util
import (
"errors"
"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/service/s3"
"io"
"log"
"sort"
"sync"
)
const dataType string = "image/jpeg"
const s3RootUrl = "https://s3.amazonaws.com"
type S3Manager struct {
Bucket string
Region string
ImagePath string
ThumbPath string
MediumPath string
mutex *sync.Mutex
existingFiles []string
NbConcurrentUploads int
svc *s3.S3
queue chan (bool)
}
func (manager *S3Manager) Connect() error {
manager.svc = s3.New(&aws.Config{
Region: aws.String(manager.Region),
})
manager.queue = make(chan (bool), manager.NbConcurrentUploads)
manager.mutex = &sync.Mutex{}
params := &s3.HeadBucketInput{
Bucket: aws.String(manager.Bucket), // Required
}
_, err := manager.svc.HeadBucket(params)
return err
}
func (manager S3Manager) UploadImage(rs io.ReadSeeker, fileName string) (url string, err error) {
return manager.upload(rs, fileName, manager.ImagePath, "image")
}
func (manager S3Manager) UploadThumb(rs io.ReadSeeker, fileName string) (url string, err error) {
return manager.upload(rs, fileName, manager.ThumbPath, "thumb")
}
func (manager S3Manager) UploadMedium(rs io.ReadSeeker, fileName string) (url string, err error) {
return manager.upload(rs, fileName, manager.MediumPath, "medium")
}
func (manager S3Manager) upload(rs io.ReadSeeker, fileName string, path string, imageType string) (url string, err error) {
defer func() { <-manager.queue }()
manager.queue <- true
log.Printf("Uploading %s %s", imageType, fileName)
filePath := path + fileName
if manager.svc == nil {
return "", errors.New("S3Manager not initialized, Connect should be called first")
}
params := &s3.PutObjectInput{
Bucket: aws.String(manager.Bucket), // Required
Key: aws.String(filePath), // Required
Body: rs,
ContentType: aws.String(dataType),
}
resp, err := manager.svc.PutObject(params)
log.Printf("%s %s successfully uploaded", imageType, fileName)
return resp.String(), err
}
func (manager *S3Manager) ExistsImage(fileName string) (exists bool, err error) {
return manager.exists(fileName, manager.ImagePath)
}
func (manager *S3Manager) ExistsThumb(fileName string) (exists bool, err error) {
return manager.exists(fileName, manager.ThumbPath)
}
func (manager *S3Manager) ExistsMedium(fileName string) (exists bool, err error) {
return manager.exists(fileName, manager.MediumPath)
}
func (manager *S3Manager) exists(fileName string, path string) (exists bool, err error) {
initError := manager.initExistingFiles()
if initError != nil {
return false, initError
}
filePath := path + fileName
i := sort.Search(len(manager.existingFiles), func(i int) bool { return manager.existingFiles[i] >= filePath })
if i < len(manager.existingFiles) && manager.existingFiles[i] == filePath {
return true, nil
}
return false, nil
}
func (manager *S3Manager) initExistingFiles() error {
defer manager.mutex.Unlock()
manager.mutex.Lock()
if len(manager.existingFiles) == 0 {
log.Printf("Retrieving all images from S3")
params := &s3.ListObjectsInput{
Bucket: aws.String(manager.Bucket), // Required
MaxKeys: aws.Int64(1000),
}
err := manager.svc.ListObjectsPages(params, func(p *s3.ListObjectsOutput, lastPage bool) bool {
for _, object := range p.Contents {
manager.existingFiles = append(manager.existingFiles, *object.Key)
}
return true
})
if err != nil {
return err
}
sort.Strings(manager.existingFiles)
log.Printf("%d images retrieved from S3", len(manager.existingFiles))
}
return nil
}
func (manager S3Manager) BucketURL() string {
return manager.svc.Endpoint + "/" + manager.Bucket + "/"
}