Skip to content

Commit

Permalink
calculate sizediff on demand
Browse files Browse the repository at this point in the history
Signed-off-by: Jörn Friedrich Dreyer <jfd@butonic.de>
  • Loading branch information
butonic committed Sep 7, 2023
1 parent 205e17d commit 6b85fe4
Show file tree
Hide file tree
Showing 9 changed files with 660 additions and 1,201 deletions.
74 changes: 56 additions & 18 deletions pkg/storage/utils/decomposedfs/decomposedfs.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ import (
"github.com/cs3org/reva/v2/pkg/storage/utils/decomposedfs/options"
"github.com/cs3org/reva/v2/pkg/storage/utils/decomposedfs/spaceidindex"
"github.com/cs3org/reva/v2/pkg/storage/utils/decomposedfs/tree"
"github.com/cs3org/reva/v2/pkg/storage/utils/decomposedfs/upload"
"github.com/cs3org/reva/v2/pkg/storage/utils/filelocks"
"github.com/cs3org/reva/v2/pkg/storage/utils/templates"
"github.com/cs3org/reva/v2/pkg/storagespace"
Expand Down Expand Up @@ -285,9 +286,18 @@ func (fs *Decomposedfs) Postprocessing(ch <-chan events.Event) {
keepUpload bool
)

n, err := node.ReadNode(ctx, fs.lu, info.MetaData[tus.CS3Prefix+"SpaceRoot"], info.MetaData[tus.CS3Prefix+"NodeId"], false, nil, true)
var sizeDiff int64
// propagate sizeDiff after failed postprocessing

n, err := upload.ReadNode(ctx, fs.lu, info)
if err != nil {
log.Error().Err(err).Str("uploadID", ev.UploadID).Msg("could not read node")
log.Error().Err(err).Str("uploadID", ev.UploadID).
Str("space", info.MetaData[tus.CS3Prefix+"SpaceRoot"]).
Str("parent", info.MetaData[tus.CS3Prefix+"NodeParentId"]).
Str("node", info.MetaData[tus.CS3Prefix+"NodeId"]).
Str("revision", info.MetaData[tus.CS3Prefix+"RevisionTime"]).
Str("name", info.MetaData[tus.CS3Prefix+"filename"]).
Msg("could not read revision node")
continue
}

Expand All @@ -304,6 +314,12 @@ func (fs *Decomposedfs) Postprocessing(ch <-chan events.Event) {
keepUpload = true // should we keep the upload when assembling failed?
failed = true
}
sizeDiff, err = upload.SetNodeToRevision(ctx, fs.lu, n, info.MetaData[tus.CS3Prefix+"RevisionTime"])
if err != nil {
log.Error().Err(err).Str("uploadID", ev.UploadID).Msg("could not finalize upload")
keepUpload = true // should we keep the upload when assembling failed?
failed = true
}
case events.PPOutcomeDelete:
failed = true
}
Expand All @@ -319,24 +335,33 @@ func (fs *Decomposedfs) Postprocessing(ch <-chan events.Event) {

now := time.Now()
if failed {
sizeDiff, err := strconv.ParseInt(info.MetaData[tus.CS3Prefix+"sizeDiff"], 10, 64)
if err != nil {
log.Error().Err(err).Str("uploadID", ev.UploadID).Interface("info", info).Msg("could not parse sizediff")
continue
}
// FIXME how can we calculate the size diff now?
/*
sizeDiff, err := strconv.ParseInt(info.MetaData[tus.CS3Prefix+"sizeDiff"], 10, 64)
if err != nil {
log.Error().Err(err).Str("uploadID", ev.UploadID).Interface("info", info).Msg("could not parse sizediff")
continue
}
*/
// propagate sizeDiff after failed postprocessing
if err := fs.tp.Propagate(ctx, n, -sizeDiff); err != nil {
//if err := fs.tp.Propagate(ctx, revisionNode, -sizeDiff); err != nil {

Check failure on line 347 in pkg/storage/utils/decomposedfs/decomposedfs.go

View workflow job for this annotation

GitHub Actions / lint

commentFormatting: put a space between `//` and comment text (gocritic)
if err := fs.tp.Propagate(ctx, n, 0); err != nil {
log.Error().Err(err).Str("uploadID", ev.UploadID).Msg("could not propagate tree size change")
}

} else if p := getParent(); p != nil {
// update parent tmtime to propagate etag change after successful postprocessing
_ = p.SetTMTime(ctx, &now)
if err := fs.tp.Propagate(ctx, p, 0); err != nil {
//if err := fs.tp.Propagate(ctx, p, 0); err != nil {

Check failure on line 355 in pkg/storage/utils/decomposedfs/decomposedfs.go

View workflow job for this annotation

GitHub Actions / lint

commentFormatting: put a space between `//` and comment text (gocritic)
if err := fs.tp.Propagate(ctx, p, sizeDiff); err != nil {
log.Error().Err(err).Str("uploadID", ev.UploadID).Msg("could not propagate etag change")
}
}

fs.Cleanup(ctx, n, info, failed, keepUpload)
fs.Cleanup(ctx, n, info.ID, info.MetaData[tus.CS3Prefix+"versionsPath"], failed)
if !keepUpload {
info.StopUpload()
}

// remove cache entry in gateway
fs.cache.RemoveStatContext(ctx, ev.ExecutingUser.GetId(), &provider.ResourceId{SpaceId: n.SpaceID, OpaqueId: n.ID})
Expand All @@ -363,7 +388,7 @@ func (fs *Decomposedfs) Postprocessing(ch <-chan events.Event) {
OpaqueId: info.MetaData[tus.CS3Prefix+"SpaceRoot"],
},
// FIXME this seems wrong, path is not really relative to space root
// actually it is: InitiateUpload calls fs.lu.Path to get the path relative to the root...
// actually it is: InitiateUpload calls fs.lu.Path to get the path relative to the root so soarch can index the path
// hm is that robust? what if the file is moved? shouldn't we store the parent id, then?
Path: utils.MakeRelativePath(filepath.Join(info.MetaData[tus.CS3Prefix+"dir"], info.MetaData[tus.CS3Prefix+"filename"])),
},
Expand All @@ -385,11 +410,18 @@ func (fs *Decomposedfs) Postprocessing(ch <-chan events.Event) {
continue // NOTE: since we can't get the upload, we can't restart postprocessing
}

n, err := node.ReadNode(ctx, fs.lu, info.MetaData[tus.CS3Prefix+"SpaceRoot"], info.MetaData[tus.CS3Prefix+"NodeId"], false, nil, true)
n, err := upload.ReadNode(ctx, fs.lu, info)
if err != nil {
log.Error().Err(err).Str("uploadID", ev.UploadID).Msg("could not read node")
log.Error().Err(err).Str("uploadID", ev.UploadID).
Str("space", info.MetaData[tus.CS3Prefix+"SpaceRoot"]).
Str("parent", info.MetaData[tus.CS3Prefix+"NodeParentId"]).
Str("node", info.MetaData[tus.CS3Prefix+"NodeId"]).
Str("revision", info.MetaData[tus.CS3Prefix+"RevisionTime"]).
Str("name", info.MetaData[tus.CS3Prefix+"filename"]).
Msg("could not read revision node")
continue
}

s, err := fs.downloadURL(ctx, ev.UploadID)
if err != nil {
log.Error().Err(err).Str("uploadID", ev.UploadID).Msg("could not create url")
Expand All @@ -402,7 +434,7 @@ func (fs *Decomposedfs) Postprocessing(ch <-chan events.Event) {
SpaceOwner: n.SpaceOwnerOrManager(ctx),
ExecutingUser: &user.User{Id: &user.UserId{OpaqueId: "postprocessing-restart"}}, // send nil instead?
ResourceID: &provider.ResourceId{SpaceId: n.SpaceID, OpaqueId: n.ID},
Filename: info.MetaData[tus.CS3Prefix+"NodeName"],
Filename: info.MetaData[tus.CS3Prefix+"filename"],
Filesize: uint64(info.Size),
}); err != nil {
log.Error().Err(err).Str("uploadID", ev.UploadID).Msg("Failed to publish BytesReceived event")
Expand Down Expand Up @@ -513,13 +545,19 @@ func (fs *Decomposedfs) Postprocessing(ch <-chan events.Event) {
continue
}

no, err := node.ReadNode(ctx, fs.lu, info.Storage["_SpaceRoot"], info.Storage["_NodeId"], false, nil, false)
// scan data should be set on the node revision not the node ... then when postprocessing finishes we need to copy the state to the node

n, err = upload.ReadNode(ctx, fs.lu, info)
if err != nil {
log.Error().Err(err).Interface("uploadID", ev.UploadID).Msg("Failed to get node after scan")
log.Error().Err(err).Str("uploadID", ev.UploadID).
Str("space", info.MetaData[tus.CS3Prefix+"SpaceRoot"]).
Str("parent", info.MetaData[tus.CS3Prefix+"NodeParentId"]).
Str("node", info.MetaData[tus.CS3Prefix+"NodeId"]).
Str("revision", info.MetaData[tus.CS3Prefix+"RevisionTime"]).
Str("name", info.MetaData[tus.CS3Prefix+"filename"]).
Msg("could not read revision node")
continue
}

n = no
}

if err := n.SetScanData(ctx, res.Description, res.Scandate); err != nil {
Expand Down
32 changes: 31 additions & 1 deletion pkg/storage/utils/decomposedfs/node/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,7 @@ func New(spaceID, id, parentID, name string, blobsize int64, blobID string, t pr
if blobID == "" {
blobID = uuid.New().String()
}
// hm but dirs have no blob id
return &Node{
SpaceID: spaceID,
ID: id,
Expand All @@ -127,6 +128,35 @@ func New(spaceID, id, parentID, name string, blobsize int64, blobID string, t pr
}
}

func (n *Node) ReadRevision(ctx context.Context, revision string) (*Node, error) {

rn := &Node{
SpaceID: n.SpaceID,
ID: n.ID + RevisionIDDelimiter + revision,
ParentID: n.ParentID,
Name: n.Name,
owner: n.owner,
lu: n.lu,
nodeType: n.nodeType,
}
attrs, err := rn.Xattrs(ctx)
switch {
case metadata.IsNotExist(err):
return rn, nil // swallow not found, the node defaults to exists = false
case err != nil:
return nil, err
}
rn.Exists = true

rn.BlobID = attrs.String(prefixes.BlobIDAttr)
rn.Blobsize, err = attrs.Int64(prefixes.BlobsizeAttr)
if err != nil {
return nil, err
}

return rn, nil
}

// Type returns the node's resource type
func (n *Node) Type(ctx context.Context) provider.ResourceType {
if n.nodeType != nil {
Expand Down Expand Up @@ -895,7 +925,7 @@ func (n *Node) GetTMTime(ctx context.Context) (time.Time, error) {
// GetMTime reads the mtime from the extended attributes, falling back to disk
func (n *Node) GetMTime(ctx context.Context) (time.Time, error) {
b, err := n.XattrString(ctx, prefixes.MTimeAttr)
if err != nil {
if err != nil || len(b) == 0 {
fi, err := os.Lstat(n.InternalPath())
if err != nil {
return time.Time{}, err
Expand Down
4 changes: 3 additions & 1 deletion pkg/storage/utils/decomposedfs/testhelpers/helpers.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import (
"github.com/cs3org/reva/v2/pkg/store"
"github.com/google/uuid"
"github.com/stretchr/testify/mock"
"github.com/tus/tusd/pkg/filestore"
"google.golang.org/grpc"

userpb "github.com/cs3org/go-cs3apis/cs3/identity/user/v1beta1"
Expand Down Expand Up @@ -89,6 +90,7 @@ func NewTestEnv(config map[string]interface{}) (*TestEnv, error) {
if err != nil {
return nil, err
}
dataStore := filestore.New(filepath.Join(tmpRoot, "uploads"))
defaultConfig := map[string]interface{}{
"root": tmpRoot,
"treetime_accounting": true,
Expand Down Expand Up @@ -170,7 +172,7 @@ func NewTestEnv(config map[string]interface{}) (*TestEnv, error) {

bs := &treemocks.Blobstore{}
tree := tree.New(lu, bs, o, store.Create())
fs, err := decomposedfs.New(o, lu, decomposedfs.NewPermissions(permissions, permissionsSelector), tree, nil)
fs, err := decomposedfs.New(o, lu, decomposedfs.NewPermissions(permissions, permissionsSelector), tree, nil, dataStore)
if err != nil {
return nil, err
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/storage/utils/decomposedfs/tree/tree.go
Original file line number Diff line number Diff line change
Expand Up @@ -391,7 +391,7 @@ func (t *Tree) ListFolder(ctx context.Context, n *node.Node) ([]*node.Node, erro

child, err := node.ReadNode(ctx, t.lookup, n.SpaceID, nodeID, false, n.SpaceRoot, true)
if err != nil {
return err
continue
}

// prevent listing denied resources
Expand Down
Loading

0 comments on commit 6b85fe4

Please sign in to comment.