-
Notifications
You must be signed in to change notification settings - Fork 1
/
oss-uploader.go
132 lines (119 loc) · 2.85 KB
/
oss-uploader.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
package cloud
import (
"fmt"
"sync"
"time"
"github.com/aliyun/aliyun-oss-go-sdk/oss"
"github.com/jackytck/alti-cli/errors"
"github.com/jackytck/alti-cli/types"
)
// NewOSSUploader returns a new OSSUploader.
func NewOSSUploader(pid string, refresh func() (*types.STS, error)) (*OSSUploader, error) {
ret := OSSUploader{
PID: pid,
RefreshSTS: refresh,
}
// retrieve STS and setup OSS connection
err := ret.Refresh()
if err != nil {
return nil, err
}
return &ret, nil
}
// OSSUploader takes care of uploading files to a specific loccation
// and refresh its credentials.
type OSSUploader struct {
PID string
RefreshSTS func() (*types.STS, error)
creds *types.STS
bucket *oss.Bucket
credsLock sync.Mutex
refreshLock sync.Mutex
}
// Refresh refreshes its STS token.
func (ou *OSSUploader) Refresh() error {
ou.refreshLock.Lock()
defer ou.refreshLock.Unlock()
expired, err := ou.isExpired()
if err != nil {
return err
}
if !expired {
return nil
}
sts, err := ou.RefreshSTS()
if err != nil {
return err
}
ou.setCreds(sts)
return ou.reconnect()
}
// PutFile puts a file under the project's write-only space in OSS.
func (ou *OSSUploader) PutFile(filepath, cloudPath string) error {
err := ou.Refresh()
if err != nil {
return err
}
// defensive: check if still has error or is expired even after refresh
if expired, err := ou.isExpired(); expired || err != nil {
if err != nil {
return err
}
return errors.ErrNOSTS
}
key := fmt.Sprintf("%s/%s", ou.PID, cloudPath)
return ou.getBucket().PutObjectFromFile(key, filepath)
}
// getCreds gets the sts creds.
func (ou *OSSUploader) getCreds() *types.STS {
ou.credsLock.Lock()
defer ou.credsLock.Unlock()
return ou.creds
}
// setCreds sets the sts creds.
func (ou *OSSUploader) setCreds(creds *types.STS) {
ou.credsLock.Lock()
defer ou.credsLock.Unlock()
ou.creds = creds
}
// getBucket gets the bucket handler.
func (ou *OSSUploader) getBucket() *oss.Bucket {
ou.credsLock.Lock()
defer ou.credsLock.Unlock()
return ou.bucket
}
// setBucket sets the bucket handler.
func (ou *OSSUploader) setBucket(bucket *oss.Bucket) {
ou.credsLock.Lock()
defer ou.credsLock.Unlock()
ou.bucket = bucket
}
// reconnect setups new oss connection from creds.
func (ou *OSSUploader) reconnect() error {
// setup new connection
sts := ou.getCreds()
c, err := oss.New(sts.Endpoint, sts.ID, sts.Secret, oss.SecurityToken(sts.Token))
if err != nil {
return err
}
// bucket handler
b, err := c.Bucket(sts.Bucket)
if err != nil {
return err
}
ou.setBucket(b)
return nil
}
// isExpired tells if the current STS has expired.
func (ou *OSSUploader) isExpired() (bool, error) {
c := ou.getCreds()
if c == nil {
return true, nil
}
due, err := time.Parse("2006-01-02T15:04:05Z", c.Expire)
if err != nil {
return true, err
}
now := time.Now()
return now.After(due), nil
}