Skip to content

Commit

Permalink
cephfs: initial implementation of recycle bin functionality
Browse files Browse the repository at this point in the history
  • Loading branch information
glpatcern committed Jun 5, 2024
1 parent d0695a4 commit 83d0ea6
Show file tree
Hide file tree
Showing 2 changed files with 148 additions and 13 deletions.
136 changes: 124 additions & 12 deletions pkg/storage/fs/cephfs/cephfs.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ import (
goceph "github.com/ceph/go-ceph/cephfs"
provider "github.com/cs3org/go-cs3apis/cs3/storage/provider/v1beta1"
typepb "github.com/cs3org/go-cs3apis/cs3/types/v1beta1"
typesv1beta1 "github.com/cs3org/go-cs3apis/cs3/types/v1beta1"
"github.com/cs3org/reva/pkg/appctx"
"github.com/cs3org/reva/pkg/errtypes"
"github.com/cs3org/reva/pkg/storage"
Expand Down Expand Up @@ -171,6 +172,21 @@ func (fs *cephfs) CreateDir(ctx context.Context, ref *provider.Reference) error
return getRevaError(err)
}

func getRecycleTargetFromPath(path string, recyclePath string, recyclePathDepth int) (string, error) {
// Tokenize the given (absolute) path
components := strings.Split(filepath.Clean(string(filepath.Separator)+path), string(filepath.Separator))
if recyclePathDepth > len(components)-1 {
return "", errors.New("path is too short")
}

// And construct the target by injecting the recyclePath at the required depth
var target []string = []string{string(filepath.Separator)}
target = append(target, components[:recyclePathDepth+1]...)
target = append(target, recyclePath, time.Now().Format("2006/01/02"))
target = append(target, components[recyclePathDepth+1:]...)
return filepath.Join(target...), nil
}

func (fs *cephfs) Delete(ctx context.Context, ref *provider.Reference) (err error) {
var path string
user := fs.makeUser(ctx)
Expand All @@ -180,10 +196,17 @@ func (fs *cephfs) Delete(ctx context.Context, ref *provider.Reference) (err erro
}

user.op(func(cv *cacheVal) {
if err = cv.mount.Unlink(path); err != nil && err.Error() == errIsADirectory {
err = cv.mount.RemoveDir(path)
if fs.conf.RecyclePath != "" {
// Recycle bin is configured, move to recycle as opposed to unlink
targetPath, err := getRecycleTargetFromPath(path, fs.conf.RecyclePath, fs.conf.RecyclePathDepth)
if err == nil {
err = cv.mount.Rename(path, targetPath)
}
} else {
if err = cv.mount.Unlink(path); err != nil && err.Error() == errIsADirectory {
err = cv.mount.RemoveDir(path)
}
}

//TODO(tmourati): Add entry id logic
})

Expand Down Expand Up @@ -597,24 +620,113 @@ func (fs *cephfs) TouchFile(ctx context.Context, ref *provider.Reference) error
return getRevaError(err)
}

func (fs *cephfs) EmptyRecycle(ctx context.Context) error {
return errtypes.NotSupported("unimplemented")
}

func (fs *cephfs) CreateStorageSpace(ctx context.Context, req *provider.CreateStorageSpaceRequest) (r *provider.CreateStorageSpaceResponse, err error) {
return nil, errtypes.NotSupported("unimplemented")
func (fs *cephfs) listDeletedEntries(ctx context.Context, maxentries int, basePath string, from, to time.Time) (res []*provider.RecycleItem, err error) {
user := fs.makeUser(ctx)
res := []*provider.RecycleItem{}
count := 0
rootRecyclePath := filepath.Join(basePath, fs.conf.RecyclePath)
for d := to; !d.Before(from); d = d.AddDate(0, 0, -1) {

user.op(func(cv *cacheVal) {
var dir *goceph.Directory
if dir, err = cv.mount.OpenDir(filepath.Join(rootRecyclePath, d.Format("2006/01/02"))); err != nil {
return
}
defer closeDir(dir)

var entry *goceph.DirEntryPlus
for entry, err = dir.ReadDirPlus(goceph.StatxBasicStats, 0); entry != nil && err == nil; entry, err = dir.ReadDirPlus(goceph.StatxBasicStats, 0) {
//TODO(lopresti) validate content of entry.Name() here.
targetPath := filepath.Join(basePath, entry.Name())
stat := entry.Statx()
res = append(res, &provider.RecycleItem{
Ref: &provider.Reference{Path: targetPath},
Key: filepath.Join(rootRecyclePath, targetPath),
Size: stat.Size,
DeletionTime: &typesv1beta1.Timestamp{
Seconds: uint64(stat.Mtime.Sec),
Nanos: uint32(stat.Mtime.Nsec),
},
})

count += 1
if count > maxentries {
err = errtypes.BadRequest("list too long")
return
}
}
})
}
return res, err
}

func (fs *cephfs) ListRecycle(ctx context.Context, basePath, key, relativePath string, from, to *typepb.Timestamp) ([]*provider.RecycleItem, error) {
return nil, errtypes.NotSupported("unimplemented")
md, err := fs.GetMD(ctx, &provider.Reference{Path: basePath}, nil)
if err != nil {
return nil, err
}
if !md.PermissionSet.ListRecycle {
return nil, errtypes.PermissionDenied("cephfs: user doesn't have permissions to restore recycled items")
}

var dateFrom, dateTo time.Time
if from != nil && to != nil {
dateFrom = time.Unix(int64(from.Seconds), 0)
dateTo = time.Unix(int64(to.Seconds), 0)
if dateFrom.AddDate(0, 0, fs.conf.MaxDaysInRecycleList).Before(dateTo) {
return nil, errtypes.BadRequest("cephfs: too many days requested in listing the recycle bin")
}
} else {
// if no date range was given, list up to two days ago
dateTo = time.Now()
dateFrom = dateTo.AddDate(0, 0, -2)
}

sublog := appctx.GetLogger(ctx).With().Logger()
sublog.Debug().Time("from", dateFrom).Time("to", dateTo).Msg("executing ListDeletedEntries")
recycleEntries, err := fs.listDeletedEntries(ctx, fs.conf.MaxRecycleEntries, basePath, dateFrom, dateTo)
if err != nil {
switch err.(type) {
case errtypes.IsBadRequest:
return nil, errtypes.BadRequest("cephfs: too many entries found in listing the recycle bin")
default:
return nil, errors.Wrap(err, "cephfs: error listing deleted entries")
}
}
return recycleEntries, nil
}

func (fs *cephfs) RestoreRecycleItem(ctx context.Context, basePath, key, relativePath string, restoreRef *provider.Reference) error {
return errtypes.NotSupported("unimplemented")
user := fs.makeUser(ctx)
md, err := fs.GetMD(ctx, &provider.Reference{Path: basePath}, nil)
if err != nil {
return err
}
if !md.PermissionSet.RestoreRecycleItem {
return errtypes.PermissionDenied("eosfs: user doesn't have permissions to restore recycled items")
}

user.op(func(cv *cacheVal) {
//TODO(lopresti) validate content of basePath and relativePath. Key is expected to contain the recycled path
if err = cv.mount.Rename(key, filepath.Join(basePath, relativePath)); err != nil {
return
}
//TODO(tmourati): Add entry id logic, handle already moved file error
})

return getRevaError(err)
}

func (fs *cephfs) PurgeRecycleItem(ctx context.Context, basePath, key, relativePath string) error {
return errtypes.NotSupported("unimplemented")
return errtypes.NotSupported("cephfs: operation not supported")
}

func (fs *cephfs) EmptyRecycle(ctx context.Context) error {
return errtypes.NotSupported("cephfs: operation not supported")
}

func (fs *cephfs) CreateStorageSpace(ctx context.Context, req *provider.CreateStorageSpaceRequest) (r *provider.CreateStorageSpaceResponse, err error) {
return nil, errtypes.NotSupported("unimplemented")
}

func (fs *cephfs) ListStorageSpaces(ctx context.Context, filter []*provider.ListStorageSpacesRequest_Filter) ([]*provider.StorageSpace, error) {
Expand Down
25 changes: 24 additions & 1 deletion pkg/storage/fs/cephfs/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,19 @@ type Options struct {
DirPerms uint32 `mapstructure:"dir_perms"`
FilePerms uint32 `mapstructure:"file_perms"`
UserQuotaBytes uint64 `mapstructure:"user_quota_bytes"`
HiddenDirs map[string]bool
// Path of the recycle bin. If empty, recycling is disabled.
RecyclePath string `mapstructure:"recycle_path"`
// Depth of the Recycle bin location, that is after how many path components
// the recycle path is located: this allows supporting recycles such as
// /top-level/s/space/.recycle with a depth = 3. Defaults to 0.
RecyclePathDepth int `mapstructure:"recycle_path_depth"`
// Maximum entries count a ListRecycle call may return: if exceeded, ListRecycle
// will return a BadRequest error
MaxRecycleEntries int `mapstructure:"max_recycle_entries"`
// Maximum time span in days a ListRecycle call may return: if exceeded, ListRecycle
// will override the "to" date with "from" + this value
MaxDaysInRecycleList int `mapstructure:"max_days_in_recycle_list"`
HiddenDirs map[string]bool
}

func (c *Options) ApplyDefaults() {
Expand Down Expand Up @@ -102,6 +114,9 @@ func (c *Options) ApplyDefaults() {
"..": true,
removeLeadingSlash(c.ShadowFolder): true,
}
if c.RecyclePath != "" {
c.HiddenDirs[c.RecyclePath] = true
}

if c.DirPerms == 0 {
c.DirPerms = dirPermDefault
Expand All @@ -114,4 +129,12 @@ func (c *Options) ApplyDefaults() {
if c.UserQuotaBytes == 0 {
c.UserQuotaBytes = 50000000000
}

if c.MaxDaysInRecycleList == 0 {
c.MaxDaysInRecycleList = 14
}

if c.MaxRecycleEntries == 0 {
c.MaxRecycleEntries = 2000
}
}

0 comments on commit 83d0ea6

Please sign in to comment.