-
Notifications
You must be signed in to change notification settings - Fork 112
/
session.go
333 lines (288 loc) · 11 KB
/
session.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
// Copyright 2018-2023 CERN
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//
// In applying this license, CERN does not waive the privileges and immunities
// granted to it by virtue of its status as an Intergovernmental Organization
// or submit itself to any jurisdiction.
package upload
import (
"context"
"encoding/json"
"os"
"path/filepath"
"strconv"
"time"
tusd "github.com/tus/tusd/pkg/handler"
userpb "github.com/cs3org/go-cs3apis/cs3/identity/user/v1beta1"
provider "github.com/cs3org/go-cs3apis/cs3/storage/provider/v1beta1"
"github.com/cs3org/reva/v2/pkg/appctx"
ctxpkg "github.com/cs3org/reva/v2/pkg/ctx"
"github.com/cs3org/reva/v2/pkg/logger"
"github.com/cs3org/reva/v2/pkg/storage/utils/decomposedfs/node"
"github.com/cs3org/reva/v2/pkg/utils"
)
// OcisSession extends tus upload lifecycle with postprocessing steps.
type OcisSession struct {
store OcisStore
// for now, we keep the json files in the uploads folder
info tusd.FileInfo
}
// Context returns a context with the user, logger and lockid used when initiating the upload session
func (s *OcisSession) Context(ctx context.Context) context.Context { // restore logger from file info
log, _ := logger.FromConfig(&logger.LogConf{
Output: "stderr", // TODO use config from decomposedfs
Mode: "json", // TODO use config from decomposedfs
Level: s.info.Storage["LogLevel"],
})
sub := log.With().Int("pid", os.Getpid()).Logger()
ctx = appctx.WithLogger(ctx, &sub)
ctx = ctxpkg.ContextSetLockID(ctx, s.lockID())
return ctxpkg.ContextSetUser(ctx, s.executantUser())
}
func (s *OcisSession) lockID() string {
return s.info.MetaData["lockid"]
}
func (s *OcisSession) executantUser() *userpb.User {
return &userpb.User{
Id: &userpb.UserId{
Type: userpb.UserType(userpb.UserType_value[s.info.Storage["UserType"]]),
Idp: s.info.Storage["Idp"],
OpaqueId: s.info.Storage["UserId"],
},
Username: s.info.Storage["UserName"],
}
}
// Purge deletes the upload session metadata and written binary data
func (s *OcisSession) Purge(ctx context.Context) error {
if err := os.Remove(sessionPath(s.store.root, s.info.ID)); err != nil {
return err
}
if err := os.Remove(s.binPath()); err != nil {
return err
}
return nil
}
// TouchBin creates a file to contain the binary data. It's size will be used to keep track of the tus upload offset.
func (s *OcisSession) TouchBin() error {
file, err := os.OpenFile(s.binPath(), os.O_CREATE|os.O_WRONLY, defaultFilePerm)
if err != nil {
return err
}
return file.Close()
}
// Persist writes the upload session metadata to disk
func (s *OcisSession) Persist(ctx context.Context) error {
uploadPath := sessionPath(s.store.root, s.info.ID)
// create folder structure (if needed)
if err := os.MkdirAll(filepath.Dir(uploadPath), 0700); err != nil {
return err
}
var d []byte
d, err := json.Marshal(s.info)
if err != nil {
return err
}
return os.WriteFile(uploadPath, d, 0600)
}
// ToFileInfo returns tus compatible FileInfo so the tus handler can access the upload offset
func (s *OcisSession) ToFileInfo() tusd.FileInfo {
return s.info
}
// ProviderID returns the provider id
func (s *OcisSession) ProviderID() string {
return s.info.MetaData["providerID"]
}
// SpaceID returns the space id
func (s *OcisSession) SpaceID() string {
return s.info.Storage["SpaceRoot"]
}
// NodeID returns the node id
func (s *OcisSession) NodeID() string {
return s.info.Storage["NodeId"]
}
// NodeParentID returns the nodes parent id
func (s *OcisSession) NodeParentID() string {
return s.info.Storage["NodeParentId"]
}
// NodeExists returns wether or not the node existed during InitiateUpload.
// FIXME If two requests try to write the same file they both will store a new
// random node id in the session and try to initialize a new node when
// finishing the upload. The second request will fail with an already exists
// error when trying to create the symlink for the node in the parent directory.
// A node should be created as part of InitiateUpload. When listing a directory
// we can decide if we want to skip the entry, or expose uploed progress
// information. But that is a bigger change and might involve client work.
func (s *OcisSession) NodeExists() bool {
return s.info.Storage["NodeExists"] == "true"
}
// HeaderIfMatch returns the if-match header for the upload session
func (s *OcisSession) HeaderIfMatch() string {
return s.info.MetaData["if-match"]
}
// HeaderIfNoneMatch returns the if-none-match header for the upload session
func (s *OcisSession) HeaderIfNoneMatch() string {
return s.info.MetaData["if-none-match"]
}
// HeaderIfUnmodifiedSince returns the if-unmodified-since header for the upload session
func (s *OcisSession) HeaderIfUnmodifiedSince() string {
return s.info.MetaData["if-unmodified-since"]
}
// Node returns the node for the session
func (s *OcisSession) Node(ctx context.Context) (*node.Node, error) {
return node.ReadNode(ctx, s.store.lu, s.SpaceID(), s.info.Storage["NodeId"], false, nil, true)
}
// ID returns the upload session id
func (s *OcisSession) ID() string {
return s.info.ID
}
// Filename returns the name of the node which is not the same as the name af the file being uploaded for legacy chunked uploads
func (s *OcisSession) Filename() string {
return s.info.Storage["NodeName"]
}
// Chunk returns the chunk name when a legacy chunked upload was started
func (s *OcisSession) Chunk() string {
return s.info.Storage["Chunk"]
}
// SetMetadata is used to fill the upload metadata that will be exposed to the end user
func (s *OcisSession) SetMetadata(key, value string) {
s.info.MetaData[key] = value
}
// SetStorageValue is used to set metadata only relevant for the upload session implementation
func (s *OcisSession) SetStorageValue(key, value string) {
s.info.Storage[key] = value
}
// SetSize will set the upload size of the underlying tus info.
func (s *OcisSession) SetSize(size int64) {
s.info.Size = size
}
// SetSizeIsDeferred is uset to change the SizeIsDeferred property of the underlying tus info.
func (s *OcisSession) SetSizeIsDeferred(value bool) {
s.info.SizeIsDeferred = value
}
// Dir returns the directory to which the upload is made
// TODO get rid of Dir(), whoever consumes the reference should be able to deal
// with a relative reference.
// Dir is only used to:
// - fill the Path property when emitting the UploadReady event after
// postprocessing finished. I wonder why the UploadReady contains a finished
// flag ... maybe multiple distinct events would make more sense.
// - build the reference that is passed to the FileUploaded event in the
// UploadFinishedFunc callback passed to the Upload call used for simple
// datatx put requests
//
// AFAICT only search and audit services consume the path.
// - search needs to index from the root anyway. And it only needs the most
// recent path to put it in the index. So it should already be able to deal
// with an id based reference.
// - audit on the other hand needs to log events with the path at the state of
// the event ... so it does need the full path.
//
// I think we can safely determine the path later, right before emitting the
// event. And maybe make it configurable, because only audit needs it, anyway.
func (s *OcisSession) Dir() string {
return s.info.Storage["Dir"]
}
// Size returns the upload size
func (s *OcisSession) Size() int64 {
return s.info.Size
}
// SizeDiff returns the size diff that was calculated after postprocessing
func (s *OcisSession) SizeDiff() int64 {
sizeDiff, _ := strconv.ParseInt(s.info.MetaData["sizeDiff"], 10, 64)
return sizeDiff
}
// Reference returns a reference that can be used to access the uploaded resource
func (s *OcisSession) Reference() provider.Reference {
return provider.Reference{
ResourceId: &provider.ResourceId{
StorageId: s.info.MetaData["providerID"],
SpaceId: s.info.Storage["SpaceRoot"],
OpaqueId: s.info.Storage["NodeId"],
},
// Path is not used
}
}
// Executant returns the id of the user that initiated the upload session
func (s *OcisSession) Executant() userpb.UserId {
return userpb.UserId{
Type: userpb.UserType(userpb.UserType_value[s.info.Storage["UserType"]]),
Idp: s.info.Storage["Idp"],
OpaqueId: s.info.Storage["UserId"],
}
}
// SetExecutant is used to remember the user that initiated the upload session
func (s *OcisSession) SetExecutant(u *userpb.User) {
s.info.Storage["Idp"] = u.GetId().GetIdp()
s.info.Storage["UserId"] = u.GetId().GetOpaqueId()
s.info.Storage["UserType"] = utils.UserTypeToString(u.GetId().Type)
s.info.Storage["UserName"] = u.GetUsername()
}
// Offset returns the current upload offset
func (s *OcisSession) Offset() int64 {
return s.info.Offset
}
// SpaceOwner returns the id of the space owner
func (s *OcisSession) SpaceOwner() *userpb.UserId {
return &userpb.UserId{
// idp and type do not seem to be consumed and the node currently only stores the user id anyway
OpaqueId: s.info.Storage["SpaceOwnerOrManager"],
}
}
// Expires returns the time the upload session expires
func (s *OcisSession) Expires() time.Time {
var t time.Time
if value, ok := s.info.MetaData["expires"]; ok {
t, _ = utils.MTimeToTime(value)
}
return t
}
// MTime returns the mtime to use for the uploaded file
func (s *OcisSession) MTime() time.Time {
var t time.Time
if value, ok := s.info.MetaData["mtime"]; ok {
t, _ = utils.MTimeToTime(value)
}
return t
}
// IsProcessing returns true if all bytes have been received. The session then has entered postprocessing state.
func (s *OcisSession) IsProcessing() bool {
// We might need a more sophisticated way to determine processing status soon
return s.info.Size == s.info.Offset && s.info.MetaData["scanResult"] == ""
}
// binPath returns the path to the file storing the binary data.
func (s *OcisSession) binPath() string {
return filepath.Join(s.store.root, "uploads", s.info.ID)
}
// InitiatorID returns the id of the initiating client
func (s *OcisSession) InitiatorID() string {
return s.info.MetaData["initiatorid"]
}
// SetScanData sets virus scan data to the upload session
func (s *OcisSession) SetScanData(result string, date time.Time) {
s.info.MetaData["scanResult"] = result
s.info.MetaData["scanDate"] = date.Format(time.RFC3339)
}
// ScanData returns the virus scan data
func (s *OcisSession) ScanData() (string, time.Time) {
date := s.info.MetaData["scanDate"]
if date == "" {
return "", time.Time{}
}
d, _ := time.Parse(time.RFC3339, date)
return s.info.MetaData["scanResult"], d
}
// sessionPath returns the path to the .info file storing the file's info.
func sessionPath(root, id string) string {
return filepath.Join(root, "uploads", id+".info")
}