-
Notifications
You must be signed in to change notification settings - Fork 0
/
fetcher_s3.go
126 lines (121 loc) · 2.9 KB
/
fetcher_s3.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
package fetcher
import (
"compress/gzip"
"crypto/md5"
"encoding/hex"
"errors"
"fmt"
"io"
"net/http"
"os"
"strings"
"time"
"github.com/jpillora/s3"
)
//S3 uses authenticated HEAD requests to poll the status of a given
//object. If it detects this file has been updated, it will perform
//an object GET and return its io.Reader stream.
type S3 struct {
//Access key falls back to env AWS_ACCESS_KEY, then metadata
Access string
//Secret key falls back to env AWS_SECRET_ACCESS_KEY, then metadata
Secret string
//Region defaults to ap-southeast-2
Region string
Bucket string
Key string
//Interval between checks
Interval time.Duration
//HeadTimeout defaults to 5 seconds
HeadTimeout time.Duration
//GetTimeout defaults to 5 minutes
GetTimeout time.Duration
//interal state
client *http.Client
delay bool
lastETag string
}
// Init validates the provided config
func (s *S3) Init() error {
if s.Bucket == "" {
return errors.New("S3 bucket not set")
} else if s.Key == "" {
return errors.New("S3 key not set")
}
if s.Region == "" {
s.Region = "ap-southeast-2"
}
//initial etag
if p, _ := os.Executable(); p != "" {
if f, err := os.Open(p); err == nil {
h := md5.New()
io.Copy(h, f)
f.Close()
s.lastETag = hex.EncodeToString(h.Sum(nil))
}
}
//apply defaults
if s.Interval <= 0 {
s.Interval = 5 * time.Minute
}
if s.HeadTimeout <= 0 {
s.HeadTimeout = 5 * time.Second
}
if s.GetTimeout <= 0 {
s.GetTimeout = 5 * time.Minute
}
return nil
}
// Fetch the binary from S3
func (s *S3) Fetch() (io.Reader, error) {
//delay fetches after first
if s.delay {
time.Sleep(s.Interval)
}
s.delay = true
//http client where we change the timeout
c := http.Client{}
//options for this key
creds := s3.AmbientCredentials()
if s.Access != "" && s.Secret != "" {
creds = s3.Credentials(s.Access, s.Secret)
}
opts := []s3.Option{creds, s3.Region(s.Region), s3.Bucket(s.Bucket), s3.Key(s.Key)}
//status check using HEAD
req, err := s3.NewRequest("HEAD", opts...)
if err != nil {
return nil, err
}
c.Timeout = s.HeadTimeout
resp, err := c.Do(req)
if err != nil {
return nil, fmt.Errorf("HEAD request failed (%s)", err)
}
if resp.StatusCode != http.StatusOK {
return nil, fmt.Errorf("HEAD request failed (%s)", resp.Status)
}
etag := strings.Trim(resp.Header.Get("ETag"), `"`)
if s.lastETag == etag {
return nil, nil //skip, file match
}
s.lastETag = etag
//binary fetch using GET
req, err = s3.NewRequest("GET", opts...)
if err != nil {
return nil, err
}
c.Timeout = s.GetTimeout
resp, err = c.Do(req)
if err != nil {
return nil, fmt.Errorf("GET request failed (%s)", err)
}
if resp.StatusCode != http.StatusOK {
return nil, fmt.Errorf("GET request failed (%s)", resp.Status)
}
//extract gz files
if strings.HasSuffix(s.Key, ".gz") && resp.Header.Get("Content-Encoding") != "gzip" {
return gzip.NewReader(resp.Body)
}
//success!
return resp.Body, nil
}