forked from rclone/rclone
/
object.go
294 lines (270 loc) · 7.46 KB
/
object.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
package putio
import (
"context"
"fmt"
"io"
"net/http"
"net/url"
"path"
"strconv"
"time"
"github.com/artpar/rclone/fs"
"github.com/artpar/rclone/fs/fserrors"
"github.com/artpar/rclone/fs/hash"
"github.com/putdotio/go-putio/putio"
)
// Object describes a Putio object
//
// Putio Objects always have full metadata
type Object struct {
fs *Fs // what this object is part of
file *putio.File
remote string // The remote path
modtime time.Time
}
// NewObject finds the Object at remote. If it can't be found
// it returns the error fs.ErrorObjectNotFound.
func (f *Fs) NewObject(ctx context.Context, remote string) (o fs.Object, err error) {
// defer log.Trace(f, "remote=%v", remote)("o=%+v, err=%v", &o, &err)
obj := &Object{
fs: f,
remote: remote,
}
err = obj.readEntryAndSetMetadata(ctx)
if err != nil {
return nil, err
}
return obj, err
}
// Return an Object from a path
//
// If it can't be found it returns the error fs.ErrorObjectNotFound.
func (f *Fs) newObjectWithInfo(ctx context.Context, remote string, info putio.File) (o fs.Object, err error) {
// defer log.Trace(f, "remote=%v, info=+v", remote, &info)("o=%+v, err=%v", &o, &err)
obj := &Object{
fs: f,
remote: remote,
}
err = obj.setMetadataFromEntry(info)
if err != nil {
return nil, err
}
return obj, err
}
// Fs returns the parent Fs
func (o *Object) Fs() fs.Info {
return o.fs
}
// Return a string version
func (o *Object) String() string {
if o == nil {
return "<nil>"
}
return o.remote
}
// Remote returns the remote path
func (o *Object) Remote() string {
return o.remote
}
// Hash returns the dropbox special hash
func (o *Object) Hash(ctx context.Context, t hash.Type) (string, error) {
if t != hash.CRC32 {
return "", hash.ErrUnsupported
}
err := o.readEntryAndSetMetadata(ctx)
if err != nil {
return "", fmt.Errorf("failed to read hash from metadata: %w", err)
}
return o.file.CRC32, nil
}
// Size returns the size of an object in bytes
func (o *Object) Size() int64 {
if o.file == nil {
return 0
}
return o.file.Size
}
// ID returns the ID of the Object if known, or "" if not
func (o *Object) ID() string {
if o.file == nil {
return ""
}
return itoa(o.file.ID)
}
// MimeType returns the content type of the Object if
// known, or "" if not
func (o *Object) MimeType(ctx context.Context) string {
err := o.readEntryAndSetMetadata(ctx)
if err != nil {
return ""
}
return o.file.ContentType
}
// setMetadataFromEntry sets the fs data from a putio.File
//
// This isn't a complete set of metadata and has an inaccurate date
func (o *Object) setMetadataFromEntry(info putio.File) error {
o.file = &info
o.modtime = info.UpdatedAt.Time
return nil
}
// Reads the entry for a file from putio
func (o *Object) readEntry(ctx context.Context) (f *putio.File, err error) {
// defer log.Trace(o, "")("f=%+v, err=%v", f, &err)
leaf, directoryID, err := o.fs.dirCache.FindPath(ctx, o.remote, false)
if err != nil {
if err == fs.ErrorDirNotFound {
return nil, fs.ErrorObjectNotFound
}
return nil, err
}
var resp struct {
File putio.File `json:"file"`
}
err = o.fs.pacer.Call(func() (bool, error) {
// fs.Debugf(o, "requesting child. directoryID: %s, name: %s", directoryID, leaf)
req, err := o.fs.client.NewRequest(ctx, "GET", "/v2/files/"+directoryID+"/child?name="+url.QueryEscape(o.fs.opt.Enc.FromStandardName(leaf)), nil)
if err != nil {
return false, err
}
_, err = o.fs.client.Do(req, &resp)
if perr, ok := err.(*putio.ErrorResponse); ok && perr.Response.StatusCode == 404 {
return false, fs.ErrorObjectNotFound
}
return shouldRetry(ctx, err)
})
if err != nil {
return nil, err
}
if resp.File.IsDir() {
return nil, fs.ErrorIsDir
}
return &resp.File, err
}
// Read entry if not set and set metadata from it
func (o *Object) readEntryAndSetMetadata(ctx context.Context) error {
if o.file != nil {
return nil
}
entry, err := o.readEntry(ctx)
if err != nil {
return err
}
return o.setMetadataFromEntry(*entry)
}
// Returns the remote path for the object
func (o *Object) remotePath() string {
return path.Join(o.fs.root, o.remote)
}
// ModTime returns the modification time of the object
//
// It attempts to read the objects mtime and if that isn't present the
// LastModified returned in the http headers
func (o *Object) ModTime(ctx context.Context) time.Time {
if o.modtime.IsZero() {
err := o.readEntryAndSetMetadata(ctx)
if err != nil {
fs.Debugf(o, "Failed to read metadata: %v", err)
return time.Now()
}
}
return o.modtime
}
// SetModTime sets the modification time of the local fs object
//
// Commits the datastore
func (o *Object) SetModTime(ctx context.Context, modTime time.Time) (err error) {
// defer log.Trace(o, "modTime=%v", modTime.String())("err=%v", &err)
req, err := o.fs.client.NewRequest(ctx, "POST", "/v2/files/touch?file_id="+strconv.FormatInt(o.file.ID, 10)+"&updated_at="+url.QueryEscape(modTime.Format(time.RFC3339)), nil)
if err != nil {
return err
}
// fs.Debugf(o, "setting modtime: %s", modTime.String())
_, err = o.fs.client.Do(req, nil)
if err != nil {
return err
}
o.modtime = modTime
if o.file != nil {
o.file.UpdatedAt.Time = modTime
}
return nil
}
// Storable returns whether this object is storable
func (o *Object) Storable() bool {
return true
}
// Open an object for read
func (o *Object) Open(ctx context.Context, options ...fs.OpenOption) (in io.ReadCloser, err error) {
// defer log.Trace(o, "")("err=%v", &err)
var storageURL string
err = o.fs.pacer.Call(func() (bool, error) {
storageURL, err = o.fs.client.Files.URL(ctx, o.file.ID, true)
return shouldRetry(ctx, err)
})
if err != nil {
return
}
var resp *http.Response
headers := fs.OpenOptionHeaders(options)
err = o.fs.pacer.Call(func() (bool, error) {
req, err := http.NewRequestWithContext(ctx, http.MethodGet, storageURL, nil)
if err != nil {
return shouldRetry(ctx, err)
}
req.Header.Set("User-Agent", o.fs.client.UserAgent)
// merge headers with extra headers
for header, value := range headers {
req.Header.Set(header, value)
}
// fs.Debugf(o, "opening file: id=%d", o.file.ID)
resp, err = o.fs.httpClient.Do(req)
if err != nil {
return shouldRetry(ctx, err)
}
if err := checkStatusCode(resp, 200, 206); err != nil {
return shouldRetry(ctx, err)
}
return false, nil
})
if perr, ok := err.(*putio.ErrorResponse); ok && perr.Response.StatusCode >= 400 && perr.Response.StatusCode <= 499 {
_ = resp.Body.Close()
return nil, fserrors.NoRetryError(err)
}
if err != nil {
return nil, err
}
return resp.Body, nil
}
// Update the already existing object
//
// Copy the reader into the object updating modTime and size.
//
// The new object may have been created if an error is returned
func (o *Object) Update(ctx context.Context, in io.Reader, src fs.ObjectInfo, options ...fs.OpenOption) (err error) {
// defer log.Trace(o, "src=%+v", src)("err=%v", &err)
remote := o.remotePath()
if ignoredFiles.MatchString(remote) {
fs.Logf(o, "File name disallowed - not uploading")
return nil
}
err = o.Remove(ctx)
if err != nil {
return err
}
newObj, err := o.fs.putUnchecked(ctx, in, src, o.remote, options...)
if err != nil {
return err
}
*o = *(newObj.(*Object))
return err
}
// Remove an object
func (o *Object) Remove(ctx context.Context) (err error) {
// defer log.Trace(o, "")("err=%v", &err)
return o.fs.pacer.Call(func() (bool, error) {
// fs.Debugf(o, "removing file: id=%d", o.file.ID)
err = o.fs.client.Files.Delete(ctx, o.file.ID)
return shouldRetry(ctx, err)
})
}