diff --git a/model/sharing/member.go b/model/sharing/member.go index 9e333c52291..c6696e626f0 100644 --- a/model/sharing/member.go +++ b/model/sharing/member.go @@ -79,6 +79,16 @@ func (m *Member) PrimaryName() string { return m.Email } +// InstanceHost returns the domain part of the Cozy URL of the member, which +// can be used to find the instance in CouchDB. It may includes the port. +func (m *Member) InstanceHost() string { + u, err := url.Parse(m.Instance) + if err != nil { + return "" + } + return u.Host +} + // Credentials is the struct with the secret stuff used for authentication & // authorization. type Credentials struct { diff --git a/model/sharing/upload.go b/model/sharing/upload.go index 1c8a1ec486b..44ec563e6f5 100644 --- a/model/sharing/upload.go +++ b/model/sharing/upload.go @@ -14,6 +14,7 @@ import ( "github.com/cozy/cozy-stack/client/request" "github.com/cozy/cozy-stack/model/instance" + "github.com/cozy/cozy-stack/model/instance/lifecycle" "github.com/cozy/cozy-stack/model/vfs" "github.com/cozy/cozy-stack/pkg/config/config" "github.com/cozy/cozy-stack/pkg/consts" @@ -29,6 +30,10 @@ type UploadMsg struct { Errors int `json:"errors"` } +// fileCreatorWithContent is a function that can be used to create a file in +// the given VFS. The content comes from the function closure. +type fileCreatorWithContent func(fs vfs.VFS, newdoc, olddoc *vfs.FileDoc) error + // Upload starts uploading files for this sharing func (s *Sharing) Upload(inst *instance.Instance, errors int) error { mu := config.Lock().ReadWrite(inst, "sharings/"+s.SID+"/upload") @@ -318,6 +323,17 @@ func (s *Sharing) uploadFile(inst *instance.Instance, m *Member, file map[string if err != nil { return err } + + dstInstance, err := lifecycle.GetInstance(m.InstanceHost()) + if err == nil && onSameStack(inst, dstInstance) { + err := s.optimizedUploadFile(inst, dstInstance, m, fileDoc, file, resBody) + if err != nil { + inst.Logger().WithNamespace("upload"). + Warnf("optimizedUploadFile failed to upload %s to %s (%s): %s", origFileID, m.Instance, s.ID(), err) + } + return err + } + content, err := fs.OpenFile(fileDoc) if err != nil { return err @@ -350,6 +366,43 @@ func (s *Sharing) uploadFile(inst *instance.Instance, m *Member, file map[string return nil } +func onSameStack(src, dst *instance.Instance) bool { + var srcPort, dstPort string + parts := strings.SplitN(src.Domain, ":", 2) + if len(parts) > 1 { + srcPort = parts[1] + } + parts = strings.SplitN(dst.Domain, ":", 2) + if len(parts) > 1 { + dstPort = parts[1] + } + return srcPort == dstPort +} + +func (s *Sharing) optimizedUploadFile( + srcInstance, dstInstance *instance.Instance, + m *Member, + srcFile *vfs.FileDoc, + dstFile map[string]interface{}, + key KeyToUpload, +) error { + srcInstance.Logger().WithNamespace("upload"). + Debugf("optimizedUploadFile %s to %s (%s)", srcFile.ID(), m.Instance, s.ID()) + + create := func(fs vfs.VFS, newdoc, olddoc *vfs.FileDoc) error { + return fs.CopyFileFromOtherFS(newdoc, olddoc, srcInstance.VFS(), srcFile) + } + + dstSharing, err := FindSharing(dstInstance, s.ID()) + if err != nil { + return err + } + if !dstSharing.Active { + return ErrInvalidSharing + } + return dstSharing.HandleFileUpload(dstInstance, key.Key, create) +} + // FileDocWithRevisions is the struct of the payload for synchronizing a file type FileDocWithRevisions struct { *vfs.FileDoc @@ -508,8 +561,7 @@ func (s *Sharing) updateFileMetadata(inst *instance.Instance, target *FileDocWit // HandleFileUpload is used to receive a file upload when synchronizing just // the metadata was not enough. -func (s *Sharing) HandleFileUpload(inst *instance.Instance, key string, body io.ReadCloser) error { - defer body.Close() +func (s *Sharing) HandleFileUpload(inst *instance.Instance, key string, create fileCreatorWithContent) error { target, err := getStore().Get(inst, key) if err != nil { return err @@ -533,13 +585,17 @@ func (s *Sharing) HandleFileUpload(inst *instance.Instance, key string, body io. } if current == nil { - return s.UploadNewFile(inst, target, body) + return s.UploadNewFile(inst, target, create) } - return s.UploadExistingFile(inst, target, current, body) + return s.UploadExistingFile(inst, target, current, create) } // UploadNewFile is used to receive a new file. -func (s *Sharing) UploadNewFile(inst *instance.Instance, target *FileDocWithRevisions, body io.ReadCloser) error { +func (s *Sharing) UploadNewFile( + inst *instance.Instance, + target *FileDocWithRevisions, + create fileCreatorWithContent, +) error { inst.Logger().WithNamespace("upload").Debugf("UploadNewFile") ref := SharedRef{ Infos: make(map[string]SharedInfo), @@ -617,7 +673,7 @@ func (s *Sharing) UploadNewFile(inst *instance.Instance, target *FileDocWithRevi newdoc.ReferencedBy = append(newdoc.ReferencedBy, ref) } - file, err := fs.CreateFile(newdoc, nil) + err = create(fs, newdoc, nil) if errors.Is(err, os.ErrExist) { pth, errp := newdoc.Path(fs) if errp != nil { @@ -632,7 +688,7 @@ func (s *Sharing) UploadNewFile(inst *instance.Instance, target *FileDocWithRevi newdoc.DocName = name newdoc.ResetFullpath() } - file, err = fs.CreateFile(newdoc, nil) + err = create(fs, newdoc, nil) } if err != nil { inst.Logger().WithNamespace("upload"). @@ -640,9 +696,9 @@ func (s *Sharing) UploadNewFile(inst *instance.Instance, target *FileDocWithRevi return err } if s.NbFiles > 0 { - defer s.countReceivedFiles(inst) + s.countReceivedFiles(inst) } - return copyFileContent(inst, file, body) + return nil } // countReceivedFiles counts the number of files received during the initial @@ -695,7 +751,12 @@ func (s *Sharing) countReceivedFiles(inst *instance.Instance) { // than on content: a conflict on different content is resolved by a copy of // the file (which is not what we want), a conflict of name+dir_id, the higher // revision wins and it should be the good one in our case. -func (s *Sharing) UploadExistingFile(inst *instance.Instance, target *FileDocWithRevisions, newdoc *vfs.FileDoc, body io.ReadCloser) error { +func (s *Sharing) UploadExistingFile( + inst *instance.Instance, + target *FileDocWithRevisions, + newdoc *vfs.FileDoc, + create fileCreatorWithContent, +) error { inst.Logger().WithNamespace("upload").Debugf("UploadExistingFile") var ref SharedRef err := couchdb.GetDoc(inst, consts.Shared, consts.Files+"/"+target.DocID, &ref) @@ -731,7 +792,7 @@ func (s *Sharing) UploadExistingFile(inst *instance.Instance, target *FileDocWit conflict := detectConflict(newdoc.DocRev, chain) switch conflict { case LostConflict: - return s.uploadLostConflict(inst, target, newdoc, body) + return s.uploadLostConflict(inst, target, newdoc, create) case WonConflict: if err = s.uploadWonConflict(inst, olddoc); err != nil { return err @@ -743,11 +804,7 @@ func (s *Sharing) UploadExistingFile(inst *instance.Instance, target *FileDocWit // Easy case: only the content has changed, not its path if newdoc.DocName == olddoc.DocName && newdoc.DirID == olddoc.DirID { - file, errf := fs.CreateFile(newdoc, olddoc) - if errf != nil { - return errf - } - return copyFileContent(inst, file, body) + return create(fs, newdoc, olddoc) } stash := indexer.StashRevision(false) @@ -755,11 +812,7 @@ func (s *Sharing) UploadExistingFile(inst *instance.Instance, target *FileDocWit tmpdoc.DocName = olddoc.DocName tmpdoc.DirID = olddoc.DirID tmpdoc.ResetFullpath() - file, err := fs.CreateFile(tmpdoc, olddoc) - if err != nil { - return err - } - if err = copyFileContent(inst, file, body); err != nil { + if err := create(fs, tmpdoc, olddoc); err != nil { return err } @@ -788,7 +841,12 @@ func (s *Sharing) UploadExistingFile(inst *instance.Instance, target *FileDocWit // uploadLostConflict manages an upload where a file is in conflict, and the // uploaded file version goes to a new file. -func (s *Sharing) uploadLostConflict(inst *instance.Instance, target *FileDocWithRevisions, newdoc *vfs.FileDoc, body io.ReadCloser) error { +func (s *Sharing) uploadLostConflict( + inst *instance.Instance, + target *FileDocWithRevisions, + newdoc *vfs.FileDoc, + create fileCreatorWithContent, +) error { rev := target.Rev() inst.Logger().WithNamespace("upload").Debugf("uploadLostConflict %s", rev) indexer := newSharingIndexer(inst, &bulkRevs{ @@ -798,21 +856,16 @@ func (s *Sharing) uploadLostConflict(inst *instance.Instance, target *FileDocWit fs := inst.VFS().UseSharingIndexer(indexer) newdoc.DocID = conflictID(newdoc.DocID, rev) if _, err := fs.FileByID(newdoc.DocID); !errors.Is(err, os.ErrNotExist) { - if err != nil { - return err - } - body.Close() - return nil + return err } newdoc.DocName = conflictName(indexer, newdoc.DirID, newdoc.DocName, true) newdoc.DocRev = "" newdoc.ResetFullpath() - file, err := fs.CreateFile(newdoc, nil) - if err != nil { + if err := create(fs, newdoc, nil); err != nil { + inst.Logger().WithNamespace("upload").Debugf("1. loser = %#v", newdoc) return err } - inst.Logger().WithNamespace("upload").Debugf("1. loser = %#v", newdoc) - return copyFileContent(inst, file, body) + return nil } // uploadWonConflict manages an upload where a file is in conflict, and the diff --git a/model/vfs/vfs.go b/model/vfs/vfs.go index dd1cf2cdb85..d173b02b575 100644 --- a/model/vfs/vfs.go +++ b/model/vfs/vfs.go @@ -126,6 +126,11 @@ type Fs interface { // version. ImportFileVersion(version *Version, content io.ReadCloser) error + // CopyFileFromOtherFS creates or updates a file by copying the content of + // a file in another Cozy. It is used for sharings, to optimize I/O when + // two instances are on the same stack. + CopyFileFromOtherFS(olddoc, newdoc *FileDoc, srcFS Fs, srcDoc *FileDoc) error + // Fsck return the list of inconsistencies in the VFS Fsck(func(log *FsckLog), bool) (err error) CheckFilesConsistency(func(*FsckLog), bool) error diff --git a/model/vfs/vfsafero/impl.go b/model/vfs/vfsafero/impl.go index 0d54af1b9ab..7b81e16c2e5 100644 --- a/model/vfs/vfsafero/impl.go +++ b/model/vfs/vfsafero/impl.go @@ -1,3 +1,6 @@ +// Package vfsafero is the implementation of the Virtual File System by using +// afero. Afero is a library for manipulating files and directory on the local +// file system. package vfsafero import ( @@ -572,6 +575,30 @@ func (afs *aferoVFS) RevertFileVersion(doc *vfs.FileDoc, version *vfs.Version) e return nil } +func (afs *aferoVFS) CopyFileFromOtherFS( + newdoc, olddoc *vfs.FileDoc, + srcFS vfs.Fs, + srcDoc *vfs.FileDoc, +) error { + content, err := srcFS.OpenFile(srcDoc) + if err != nil { + return err + } + defer content.Close() + + fd, err := afs.CreateFile(newdoc, olddoc) + if err != nil { + return err + } + + _, err = io.Copy(fd, content) + errc := fd.Close() + if err != nil { + return err + } + return errc +} + // UpdateFileDoc overrides the indexer's one since the afero.Fs is by essence // also indexed by path. When moving a file, the index has to be moved and the // filesystem should also be updated. diff --git a/model/vfs/vfsswift/impl_v3.go b/model/vfs/vfsswift/impl_v3.go index fe70757e115..97a345f5d45 100644 --- a/model/vfs/vfsswift/impl_v3.go +++ b/model/vfs/vfsswift/impl_v3.go @@ -1,3 +1,6 @@ +// Package vfsswift is the implementation of the Virtual File System by using +// Swift from the OpenStack project. The file contents are saved in the object +// storage (Swift), and the metadata are indexed in CouchDB. package vfsswift import ( @@ -591,6 +594,101 @@ func (sfs *swiftVFSV3) RevertFileVersion(doc *vfs.FileDoc, version *vfs.Version) return sfs.Indexer.DeleteVersion(version) } +func (sfs *swiftVFSV3) CopyFileFromOtherFS( + newdoc, olddoc *vfs.FileDoc, + srcFS vfs.Fs, + srcDoc *vfs.FileDoc, +) error { + if lockerr := sfs.mu.Lock(); lockerr != nil { + return lockerr + } + defer sfs.mu.Unlock() + + newsize, maxsize, capsize, err := vfs.CheckAvailableDiskSpace(sfs, newdoc) + if err != nil { + return err + } + if newsize > maxsize { + return vfs.ErrFileTooBig + } + + newpath, err := sfs.Indexer.FilePath(newdoc) + if err != nil { + return err + } + if strings.HasPrefix(newpath, vfs.TrashDirName+"/") { + return vfs.ErrParentInTrash + } + + if olddoc == nil { + var exists bool + exists, err = sfs.Indexer.DirChildExists(newdoc.DirID, newdoc.DocName) + if err != nil { + return err + } + if exists { + return os.ErrExist + } + } + + if newdoc.DocID == "" { + uid, err := uuid.NewV7() + if err != nil { + return err + } + newdoc.DocID = uid.String() + } + + newdoc.InternalID = NewInternalID() + + srcName := MakeObjectNameV3(srcDoc.DocID, srcDoc.InternalID) + dstName := MakeObjectNameV3(newdoc.DocID, newdoc.InternalID) + srcContainer := srcFS.(*swiftVFSV3).container + if _, err := sfs.c.ObjectCopy(sfs.ctx, srcContainer, srcName, sfs.container, dstName, nil); err != nil { + return err + } + + var v *vfs.Version + if olddoc != nil { + v = vfs.NewVersion(olddoc) + err = sfs.Indexer.UpdateFileDoc(olddoc, newdoc) + } else { + err = sfs.Indexer.CreateNamedFileDoc(newdoc) + } + if err != nil { + return err + } + + if v != nil { + actionV, toClean, _ := vfs.FindVersionsToClean(sfs, newdoc.DocID, v) + if bytes.Equal(newdoc.MD5Sum, olddoc.MD5Sum) { + actionV = vfs.CleanCandidateVersion + } + if actionV == vfs.KeepCandidateVersion { + if errv := sfs.Indexer.CreateVersion(v); errv != nil { + actionV = vfs.CleanCandidateVersion + } + } + if actionV == vfs.CleanCandidateVersion { + internalID := v.DocID + if parts := strings.SplitN(v.DocID, "/", 2); len(parts) > 1 { + internalID = parts[1] + } + objName := MakeObjectNameV3(newdoc.DocID, internalID) + _ = sfs.c.ObjectDelete(sfs.ctx, sfs.container, objName) + } + for _, old := range toClean { + _ = cleanOldVersion(sfs, newdoc.DocID, old) + } + } + + if capsize > 0 && newsize >= capsize { + vfs.PushDiskQuotaAlert(sfs, true) + } + + return nil +} + // UpdateFileDoc calls the indexer UpdateFileDoc function and adds a few checks // before actually calling this method: // - locks the filesystem for writing diff --git a/tests/system/tests/sharing_moves_n_delete.rb b/tests/system/tests/sharing_moves_n_delete.rb index 7bd8cbf3af2..09f557132ef 100644 --- a/tests/system/tests/sharing_moves_n_delete.rb +++ b/tests/system/tests/sharing_moves_n_delete.rb @@ -7,18 +7,21 @@ Helpers.scenario "moves_n_delete" Helpers.start_mailhog - # Create the instance + # Create the instances inst = Instance.create name: "Alice" inst_bob = Instance.create name: "Bob" - inst_charlie = Instance.create name: "Charlie" + inst_charlie = Instance.create name: "Charlie", port: inst.stack.port # Create hierarchy folder = Folder.create inst folder.couch_id.wont_be_empty subdir = Folder.create inst, dir_id: folder.couch_id - child1 = Folder.create inst, dir_id: subdir.couch_id - child2 = Folder.create inst, dir_id: subdir.couch_id - child3 = Folder.create inst, dir_id: subdir.couch_id + childname1 = "c_#{Faker::Internet.slug}1" + childname2 = "c_#{Faker::Internet.slug}1" + childname3 = "c_#{Faker::Internet.slug}1" + child1 = Folder.create inst, dir_id: subdir.couch_id, name: childname1 + child2 = Folder.create inst, dir_id: subdir.couch_id, name: childname2 + child3 = Folder.create inst, dir_id: subdir.couch_id, name: childname3 filename1 = "#{Faker::Internet.slug}1.txt" filename2 = "#{Faker::Internet.slug}2.txt" filename3 = "#{Faker::Internet.slug}3.txt" diff --git a/web/sharings/replicator.go b/web/sharings/replicator.go index e7fe867be79..05f74e03512 100644 --- a/web/sharings/replicator.go +++ b/web/sharings/replicator.go @@ -3,9 +3,11 @@ package sharings import ( "encoding/json" "errors" + "io" "net/http" "github.com/cozy/cozy-stack/model/sharing" + "github.com/cozy/cozy-stack/model/vfs" "github.com/cozy/cozy-stack/pkg/consts" "github.com/cozy/cozy-stack/pkg/jsonapi" "github.com/cozy/cozy-stack/web/middlewares" @@ -127,7 +129,20 @@ func FileHandler(c echo.Context) error { inst.Logger().WithNamespace("replicator").Infof("Sharing was not found: %s", err) return wrapErrors(err) } - if err := s.HandleFileUpload(inst, c.Param("id"), c.Request().Body); err != nil { + + create := func(fs vfs.VFS, newdoc, olddoc *vfs.FileDoc) error { + file, err := fs.CreateFile(newdoc, olddoc) + if err != nil { + return err + } + _, err = io.Copy(file, c.Request().Body) + if cerr := file.Close(); cerr != nil && err == nil { + return cerr + } + return err + } + + if err := s.HandleFileUpload(inst, c.Param("id"), create); err != nil { inst.Logger().WithNamespace("replicator").Infof("Error on file upload: %s", err) return wrapErrors(err) }