Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve posixfs #4730

Merged
merged 5 commits into from
Jun 18, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions changelog/unreleased/improve-posixfs.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ Enhancement: Improve posixfs storage driver

Improve the posixfs storage driver by fixing several issues and adding missing features.

https://github.com/cs3org/reva/pull/4730
https://github.com/cs3org/reva/pull/4719
https://github.com/cs3org/reva/pull/4708
https://github.com/cs3org/reva/pull/4562
65 changes: 0 additions & 65 deletions pkg/storage/fs/posix/lookup/lookup.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ import (
"os"
"path/filepath"
"strings"
"syscall"

user "github.com/cs3org/go-cs3apis/cs3/identity/user/v1beta1"
provider "github.com/cs3org/go-cs3apis/cs3/storage/provider/v1beta1"
Expand Down Expand Up @@ -77,10 +76,6 @@ func New(b metadata.Backend, um usermapper.Mapper, o *options.Options) *Lookup {
userMapper: um,
}

go func() {
_ = lu.WarmupIDCache(o.Root)
}()

return lu
}

Expand All @@ -94,66 +89,6 @@ func (lu *Lookup) GetCachedID(ctx context.Context, spaceID, nodeID string) (stri
return lu.IDCache.Get(ctx, spaceID, nodeID)
}

// WarmupIDCache warms up the id cache
func (lu *Lookup) WarmupIDCache(root string) error {
spaceID := []byte("")

scopeSpace := func(spaceCandidate string) error {
if !lu.Options.UseSpaceGroups {
return nil
}

// set the uid and gid for the space
fi, err := os.Stat(spaceCandidate)
if err != nil {
return err
}
sys := fi.Sys().(*syscall.Stat_t)
gid := int(sys.Gid)
_, err = lu.userMapper.ScopeUserByIds(-1, gid)
return err
}

return filepath.Walk(root, func(path string, info os.FileInfo, err error) error {
if err != nil {
return err
}

attribs, err := lu.metadataBackend.All(context.Background(), path)
if err == nil {
nodeSpaceID := attribs[prefixes.SpaceIDAttr]
if len(nodeSpaceID) > 0 {
spaceID = nodeSpaceID

err = scopeSpace(path)
if err != nil {
return err
}
} else {
// try to find space
spaceCandidate := path
for strings.HasPrefix(spaceCandidate, lu.Options.Root) {
spaceID, err = lu.MetadataBackend().Get(context.Background(), spaceCandidate, prefixes.SpaceIDAttr)
if err == nil {
err = scopeSpace(path)
if err != nil {
return err
}
break
}
spaceCandidate = filepath.Dir(spaceCandidate)
}
}

id, ok := attribs[prefixes.IDAttr]
if ok && len(spaceID) > 0 {
_ = lu.IDCache.Set(context.Background(), string(spaceID), string(id), path)
}
}
return nil
})
}

// NodeFromPath returns the node for the given path
func (lu *Lookup) NodeIDFromParentAndName(ctx context.Context, parent *node.Node, name string) (string, error) {
id, err := lu.metadataBackend.Get(ctx, filepath.Join(parent.InternalPath(), name), prefixes.IDAttr)
Expand Down
1 change: 0 additions & 1 deletion pkg/storage/fs/posix/lookup/store_idcache.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,6 @@ func NewStoreIDCache(o *options.Options) *StoreIDCache {
return &StoreIDCache{
cache: store.Create(
store.Store(o.IDCache.Store),
store.TTL(o.IDCache.TTL),
store.Size(o.IDCache.Size),
microstore.Nodes(o.IDCache.Nodes...),
microstore.Database(o.IDCache.Database),
Expand Down
65 changes: 65 additions & 0 deletions pkg/storage/fs/posix/tree/assimilation.go
Original file line number Diff line number Diff line change
Expand Up @@ -279,3 +279,68 @@ assimilate:

return fi, nil
}

// WarmupIDCache warms up the id cache
func (t *Tree) WarmupIDCache(root string, assimilate bool) error {
spaceID := []byte("")

scopeSpace := func(spaceCandidate string) error {
if !t.options.UseSpaceGroups {
return nil
}

// set the uid and gid for the space
fi, err := os.Stat(spaceCandidate)
if err != nil {
return err
}
sys := fi.Sys().(*syscall.Stat_t)
gid := int(sys.Gid)
_, err = t.userMapper.ScopeUserByIds(-1, gid)
return err
}

return filepath.Walk(root, func(path string, info os.FileInfo, err error) error {
if err != nil {
return err
}

attribs, err := t.lookup.MetadataBackend().All(context.Background(), path)
if err == nil {
nodeSpaceID := attribs[prefixes.SpaceIDAttr]
if len(nodeSpaceID) > 0 {
spaceID = nodeSpaceID

err = scopeSpace(path)
if err != nil {
return err
}
} else {
// try to find space
spaceCandidate := path
for strings.HasPrefix(spaceCandidate, t.options.Root) {
spaceID, err = t.lookup.MetadataBackend().Get(context.Background(), spaceCandidate, prefixes.SpaceIDAttr)
if err == nil {
err = scopeSpace(path)
if err != nil {
return err
}
break
}
spaceCandidate = filepath.Dir(spaceCandidate)
}
}
if len(spaceID) == 0 {
return nil // no space found
}

id, ok := attribs[prefixes.IDAttr]
if ok {
_ = t.lookup.(*lookup.Lookup).CacheID(context.Background(), string(spaceID), string(id), path)
} else if assimilate {
_ = t.Scan(path, false)
}
}
return nil
})
}
4 changes: 1 addition & 3 deletions pkg/storage/fs/posix/tree/gpfsfilauditloggingwatcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,6 @@ import (
"os"
"strconv"
"time"

"github.com/cs3org/reva/v2/pkg/storage/fs/posix/lookup"
)

type GpfsFileAuditLoggingWatcher struct {
Expand Down Expand Up @@ -72,7 +70,7 @@ start:
case "RENAME":
go func() {
_ = w.tree.Scan(ev.Path, true)
_ = w.tree.lookup.(*lookup.Lookup).WarmupIDCache(ev.Path)
_ = w.tree.WarmupIDCache(ev.Path, false)
}()
}
case io.EOF:
Expand Down
3 changes: 1 addition & 2 deletions pkg/storage/fs/posix/tree/gpfswatchfolderwatcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ import (
"strconv"
"strings"

"github.com/cs3org/reva/v2/pkg/storage/fs/posix/lookup"
kafka "github.com/segmentio/kafka-go"
)

Expand Down Expand Up @@ -57,7 +56,7 @@ func (w *GpfsWatchFolderWatcher) Watch(topic string) {
case strings.Contains(lwev.Event, "IN_MOVED_TO"):
go func() {
_ = w.tree.Scan(lwev.Path, true)
_ = w.tree.lookup.(*lookup.Lookup).WarmupIDCache(lwev.Path)
_ = w.tree.WarmupIDCache(lwev.Path, false)
}()
}
}
Expand Down
3 changes: 1 addition & 2 deletions pkg/storage/fs/posix/tree/inotifywatcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ import (
"fmt"
"strings"

"github.com/cs3org/reva/v2/pkg/storage/fs/posix/lookup"
"github.com/pablodz/inotifywaitgo/inotifywaitgo"
)

Expand Down Expand Up @@ -52,7 +51,7 @@ func (iw *InotifyWatcher) Watch(path string) {
case inotifywaitgo.MOVED_TO:
go func() {
_ = iw.tree.Scan(event.Filename, true)
_ = iw.tree.lookup.(*lookup.Lookup).WarmupIDCache(event.Filename)
_ = iw.tree.WarmupIDCache(event.Filename, false)
}()
case inotifywaitgo.CLOSE_WRITE:
go func() { _ = iw.tree.Scan(event.Filename, true) }()
Expand Down
24 changes: 20 additions & 4 deletions pkg/storage/fs/posix/tree/tree.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import (
"strings"
"time"

"github.com/gofrs/flock"
"github.com/google/uuid"
"github.com/pkg/errors"
"github.com/rs/zerolog"
Expand Down Expand Up @@ -133,10 +134,25 @@ func New(lu node.PathLookup, bs Blobstore, um usermapper.Mapper, o *options.Opti
}

// Start watching for fs events and put them into the queue
go t.watcher.Watch(watchPath)
go func() {
fileLock := flock.New(filepath.Join(o.Root, ".primary.lock"))
locked, err := fileLock.TryLock()
if err != nil {
log.Err(err).Msg("could not acquire primary lock")
return
}
if !locked {
log.Err(err).Msg("watcher is already locked")
return
}
log.Debug().Msg("acquired primary lock")

// Handle queued fs events
go t.workScanQueue()
go t.watcher.Watch(watchPath)
go t.workScanQueue()
go func() {
_ = t.WarmupIDCache(o.Root, true)
}()
}()

return t, nil
}
Expand Down Expand Up @@ -308,7 +324,7 @@ func (t *Tree) Move(ctx context.Context, oldNode *node.Node, newNode *node.Node)
_ = t.lookup.(*lookup.Lookup).CacheID(ctx, newNode.SpaceID, newNode.ID, filepath.Join(newNode.ParentPath(), newNode.Name))
// update id cache for the moved subtree
if oldNode.IsDir(ctx) {
err = t.lookup.(*lookup.Lookup).WarmupIDCache(filepath.Join(newNode.ParentPath(), newNode.Name))
err = t.WarmupIDCache(filepath.Join(newNode.ParentPath(), newNode.Name), false)
if err != nil {
return err
}
Expand Down
Loading