/
api.go
210 lines (167 loc) · 6.18 KB
/
api.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
package objgcp
import (
"context"
"io"
"cloud.google.com/go/storage"
)
//go:generate mockery --all --case underscore --inpackage
// storageAPI is a top level interface which allows interactions with Google cloud storage.
type serviceAPI interface {
Bucket(name string) bucketAPI
Close() error
}
// serviceClient implements the 'storageAPI' interface and encapsulates the Google SDK into a unit testable interface.
type serviceClient struct {
c *storage.Client
}
func (s serviceClient) Bucket(name string) bucketAPI {
return bucketHandle{h: s.c.Bucket(name)}
}
func (s serviceClient) Close() error {
return s.c.Close()
}
// bucketAPI is a bucket level interface which allows interactions with a Google Storage bucket.
type bucketAPI interface {
Object(key string) objectAPI
Objects(ctx context.Context, query *storage.Query) objectIteratorAPI
}
// bucketHandle implements the 'bucketAPI' interface and encapsulates the Google Storage SDK into a unit testable
// interface.
type bucketHandle struct {
h *storage.BucketHandle
}
func (b bucketHandle) Object(key string) objectAPI {
return objectHandle{h: b.h.Object(key)}
}
func (b bucketHandle) Objects(ctx context.Context, query *storage.Query) objectIteratorAPI {
return b.h.Objects(ctx, query)
}
// objectAPI is an object level API which allows interactions with an object stored in a Google cloud bucket.
type objectAPI interface {
Attrs(ctx context.Context) (*storage.ObjectAttrs, error)
Delete(ctx context.Context) error
NewRangeReader(ctx context.Context, offset, length int64) (readerAPI, error)
NewWriter(ctx context.Context) writerAPI
ComposerFrom(srcs ...objectAPI) composeAPI
CopierFrom(src objectAPI) copierAPI
Retryer(opts ...storage.RetryOption) objectAPI
}
// objectHandle implements the 'objectAPI' interface and encapsulates the Google Storage SDK into a unit testable
// interface.
type objectHandle struct {
h *storage.ObjectHandle
}
func (o objectHandle) Attrs(ctx context.Context) (*storage.ObjectAttrs, error) {
return o.h.Attrs(ctx)
}
func (o objectHandle) Delete(ctx context.Context) error {
return o.h.Delete(ctx)
}
func (o objectHandle) NewRangeReader(ctx context.Context, offset, length int64) (readerAPI, error) {
r, err := o.h.NewRangeReader(ctx, offset, length)
if err != nil {
return nil, err
}
return reader{r: r}, nil
}
func (o objectHandle) NewWriter(ctx context.Context) writerAPI {
writer := writer{w: o.h.NewWriter(ctx)}
// Reduce the default chunk size from 16MiB; this avoids allocating large(er) amounts of memory for smaller uploads.
//
// NOTE: Chunking should not be disabled because that'll implicitly disable request retries.
writer.w.ChunkSize = ChunkSize
// Increase the chunk retry deadline from 32s; this means chunks that fail to upload will be retried for longer. As
// indicated in the SDK documentation:
//
// "The default value is 32s. Users may want to pick a longer deadline if they are using larger values for ChunkSize
// or if they expect to have a slow or unreliable internet connection."
//
// NOTE: This encompasses the time taken in the attempt to upload the chunk, as an example imagine the case where we
// timeout receiving the HTTP response headers after 30s, this leaves us only a small window to retry our request
// and it's quite possible that retries could be cut short unexpectedly.
writer.w.ChunkRetryDeadline = ChunkRetryDeadline
return writer
}
func (o objectHandle) ComposerFrom(srcs ...objectAPI) composeAPI {
converted := make([]*storage.ObjectHandle, 0, len(srcs))
for _, src := range srcs {
converted = append(converted, src.(objectHandle).h)
}
return composer{c: o.h.ComposerFrom(converted...)}
}
func (o objectHandle) CopierFrom(src objectAPI) copierAPI {
return copier{c: o.h.CopierFrom(src.(objectHandle).h)}
}
func (o objectHandle) Retryer(opts ...storage.RetryOption) objectAPI {
return objectHandle{h: o.h.Retryer(opts...)}
}
// readerAPI is a range aware reader API which is used to stream object data from Google Storage.
type readerAPI interface {
io.ReadCloser
Attrs() storage.ReaderObjectAttrs
}
// reader implements the 'readerAPI' and encapsulates the Google Storage SDK into a unit testable interface.
type reader struct {
r *storage.Reader
}
func (r reader) Read(p []byte) (int, error) {
return r.r.Read(p)
}
func (r reader) Close() error {
return r.r.Close()
}
func (r reader) Attrs() storage.ReaderObjectAttrs {
return r.r.Attrs
}
// writerAPI is a checksum aware writer API which is used to upload data to Google Storage.
type writerAPI interface {
io.WriteCloser
SendMD5(md5 []byte)
SendCRC(crc uint32)
}
// writer implements the 'writerAPI' and encapsulates the Google Storage SDK into a unit testable interface.
type writer struct {
w *storage.Writer
}
func (w writer) Write(p []byte) (int, error) {
return w.w.Write(p)
}
func (w writer) Close() error {
return w.w.Close()
}
func (w writer) SendMD5(md5 []byte) {
w.w.ObjectAttrs.MD5 = md5
}
func (w writer) SendCRC(crc uint32) {
w.w.SendCRC32C = true
w.w.ObjectAttrs.CRC32C = crc
}
// objectIteratorAPI is an object level iterator API which can be used to list objects in Google Storage.
type objectIteratorAPI interface {
Next() (*storage.ObjectAttrs, error)
}
// composeAPI object level API which allows composing objects from up to 32 (the current maximum) individual objects and
// can be thought of as poor mans multipart uploads.
//
// NOTE: Google Storage does support resumable streaming uploads, however, the SDK doesn't expose this functionality in
// a way which would work in a way which we'd desire. For example, no API is exposed to save/maintain upload state to
// allow resuming after a process has died (required for resume).
type composeAPI interface {
Run(ctx context.Context) (*storage.ObjectAttrs, error)
}
// composer implements the 'composeAPI' interface and encapsulates the Google Storage SDK in a unit testable interface.
type composer struct {
c *storage.Composer
}
func (c composer) Run(ctx context.Context) (*storage.ObjectAttrs, error) {
return c.c.Run(ctx)
}
type copierAPI interface {
Run(ctx context.Context) (*storage.ObjectAttrs, error)
}
type copier struct {
c *storage.Copier
}
func (c copier) Run(ctx context.Context) (*storage.ObjectAttrs, error) {
return c.c.Run(ctx)
}