Skip to content

Commit

Permalink
Introduce spaces field mask (#2888)
Browse files Browse the repository at this point in the history
* introduce spaces filed mask

Signed-off-by: Jörn Friedrich Dreyer <jfd@butonic.de>

* prevent panic when deleting or creating spaces is not permitted

Signed-off-by: Jörn Friedrich Dreyer <jfd@butonic.de>

* return not found when trying to restore a space as non admin

Signed-off-by: Jörn Friedrich Dreyer <jfd@butonic.de>

* allow clients to send a mask with ListStorageSpaces

Signed-off-by: Jörn Friedrich Dreyer <jfd@butonic.de>
  • Loading branch information
butonic authored May 30, 2022
1 parent fd9e2af commit 036e520
Show file tree
Hide file tree
Showing 6 changed files with 82 additions and 29 deletions.
5 changes: 5 additions & 0 deletions changelog/unreleased/spaces-field-mask.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
Enhancement: introduce spaces field mask

We now use a field mask to select which properties to retrieve when looking up storage spaces. This allows the gateway to only ask for `root` when trying to forward id or path based requests.

https://github.com/cs3org/reva/pull/2888
12 changes: 11 additions & 1 deletion internal/grpc/services/gateway/storageprovider.go
Original file line number Diff line number Diff line change
Expand Up @@ -217,7 +217,16 @@ func (s *svc) CreateStorageSpace(ctx context.Context, req *provider.CreateStorag

func (s *svc) ListStorageSpaces(ctx context.Context, req *provider.ListStorageSpacesRequest) (*provider.ListStorageSpacesResponse, error) {
// TODO update CS3 api to forward the filters to the registry so it can filter the number of providers the gateway needs to query
filters := map[string]string{}
filters := map[string]string{
// TODO add opaque / CS3 api to expand 'path,root,stat?' properties / field mask
"mask": "*", // fetch all properties when listing storage spaces
}

mask := utils.ReadPlainFromOpaque(req.Opaque, "mask")
if mask != "" {
// TODO check for allowed filters
filters["mask"] = mask
}

for _, f := range req.Filters {
switch f.Type {
Expand Down Expand Up @@ -1093,6 +1102,7 @@ func (s *svc) findSpaces(ctx context.Context, ref *provider.Reference) ([]*regis
}

filters := map[string]string{
"mask": "root", // we only need the root for routing
"path": ref.Path,
}
if ref.ResourceId != nil {
Expand Down
12 changes: 8 additions & 4 deletions internal/grpc/services/storageprovider/storageprovider.go
Original file line number Diff line number Diff line change
Expand Up @@ -501,8 +501,10 @@ func (s *service) CreateStorageSpace(ctx context.Context, req *provider.CreateSt
}, nil
}

resp.StorageSpace.Id.OpaqueId = storagespace.FormatStorageID(s.conf.MountID, resp.StorageSpace.Id.GetOpaqueId())
resp.StorageSpace.Root.StorageId = storagespace.FormatStorageID(s.conf.MountID, resp.StorageSpace.Root.GetStorageId())
if resp.StorageSpace != nil {
resp.StorageSpace.Id.OpaqueId = storagespace.FormatStorageID(s.conf.MountID, resp.StorageSpace.Id.GetOpaqueId())
resp.StorageSpace.Root.StorageId = storagespace.FormatStorageID(s.conf.MountID, resp.StorageSpace.Root.GetStorageId())
}
return resp, nil
}

Expand Down Expand Up @@ -578,8 +580,10 @@ func (s *service) UpdateStorageSpace(ctx context.Context, req *provider.UpdateSt
Msg("failed to update storage space")
return nil, err
}
res.StorageSpace.Id.OpaqueId = storagespace.FormatStorageID(s.conf.MountID, res.StorageSpace.Id.GetOpaqueId())
res.StorageSpace.Root.StorageId = storagespace.FormatStorageID(s.conf.MountID, res.StorageSpace.Root.GetStorageId())
if res.StorageSpace != nil {
res.StorageSpace.Id.OpaqueId = storagespace.FormatStorageID(s.conf.MountID, res.StorageSpace.Id.GetOpaqueId())
res.StorageSpace.Root.StorageId = storagespace.FormatStorageID(s.conf.MountID, res.StorageSpace.Root.GetStorageId())
}
return res, nil
}

Expand Down
4 changes: 3 additions & 1 deletion internal/http/services/owncloud/ocdav/propfind/propfind.go
Original file line number Diff line number Diff line change
Expand Up @@ -381,7 +381,9 @@ func (p *Handler) getResourceInfos(ctx context.Context, w http.ResponseWriter, r
}
spacePath := string(space.Opaque.Map["path"].Value)
// TODO separate stats to the path or to the children, after statting all children update the mtime/etag
// TODO get mtime, and size from space as well, so we no longer have to stat here?
// TODO get mtime, and size from space as well, so we no longer have to stat here? would require sending the requested metadata keys as well
// root should be a ResourceInfo so it can contain the full stat, not only the id ... do we even need spaces then?
// metadata keys could all be prefixed with "root." to indicate we want more than the root id ...
spaceRef := spacelookup.MakeRelativeReference(space, requestPath, spacesPropfind)
info, status, err := p.statSpace(ctx, client, space, spaceRef, metadataKeys)
if err != nil || status.GetCode() != rpc.Code_CODE_OK {
Expand Down
56 changes: 41 additions & 15 deletions pkg/storage/registry/spaces/spaces.go
Original file line number Diff line number Diff line change
Expand Up @@ -270,18 +270,19 @@ func (r *registry) GetProvider(ctx context.Context, space *providerpb.StorageSpa
func (r *registry) ListProviders(ctx context.Context, filters map[string]string) ([]*registrypb.ProviderInfo, error) {
b, _ := strconv.ParseBool(filters["unique"])
unrestricted, _ := strconv.ParseBool(filters["unrestricted"])
mask := filters["mask"]
switch {
case filters["storage_id"] != "" && filters["opaque_id"] != "":
findMountpoint := filters["type"] == "mountpoint"
findGrant := !findMountpoint && filters["path"] == "" // relvative references, by definition, occur in the correct storage, so do not look for grants
return r.findProvidersForResource(ctx, filters["storage_id"]+"!"+filters["opaque_id"], findMountpoint, findGrant, unrestricted), nil
return r.findProvidersForResource(ctx, filters["storage_id"]+"!"+filters["opaque_id"], findMountpoint, findGrant, unrestricted, mask), nil
case filters["path"] != "":
return r.findProvidersForAbsolutePathReference(ctx, filters["path"], b, unrestricted), nil
return r.findProvidersForAbsolutePathReference(ctx, filters["path"], b, unrestricted, mask), nil
case len(filters) == 0:
// return all providers
return r.findAllProviders(ctx), nil
return r.findAllProviders(ctx, mask), nil
default:
return r.findProvidersForFilter(ctx, r.buildFilters(filters), unrestricted), nil
return r.findProvidersForFilter(ctx, r.buildFilters(filters), unrestricted, mask), nil
}
}

Expand Down Expand Up @@ -321,7 +322,7 @@ func (r *registry) buildFilters(filterMap map[string]string) []*providerpb.ListS
return filters
}

func (r *registry) findProvidersForFilter(ctx context.Context, filters []*providerpb.ListStorageSpacesRequest_Filter, unrestricted bool) []*registrypb.ProviderInfo {
func (r *registry) findProvidersForFilter(ctx context.Context, filters []*providerpb.ListStorageSpacesRequest_Filter, unrestricted bool, _ string) []*registrypb.ProviderInfo {

currentUser := ctxpkg.ContextMustGetUser(ctx)
providerInfos := []*registrypb.ProviderInfo{}
Expand Down Expand Up @@ -367,20 +368,45 @@ func (r *registry) findProvidersForFilter(ctx context.Context, filters []*provid
// findProvidersForResource looks up storage providers based on a resource id
// for the root of a space the res.StorageId is the same as the res.OpaqueId
// for share spaces the res.StorageId tells the registry the spaceid and res.OpaqueId is a node in that space
func (r *registry) findProvidersForResource(ctx context.Context, id string, findMoundpoint, findGrant, unrestricted bool) []*registrypb.ProviderInfo {
func (r *registry) findProvidersForResource(ctx context.Context, id string, findMoundpoint, findGrant, unrestricted bool, mask string) []*registrypb.ProviderInfo {
currentUser := ctxpkg.ContextMustGetUser(ctx)
providerInfos := []*registrypb.ProviderInfo{}
sdid, _ := storagespace.SplitStorageID(id)
providerID, spaceID := storagespace.SplitStorageID(id)
spaceID, nodeID, err := storagespace.SplitID(spaceID)
if err != nil {
appctx.GetLogger(ctx).Error().Err(err).Msg("splitting spaceid failed")
return nil
}

for address, provider := range r.c.Providers {
// try to find provider based on storageproviderid prefix
if provider.ProviderID != "" && sdid != "" && provider.ProviderID != sdid {
// skip mismatching storageproviders
continue
}
p := &registrypb.ProviderInfo{
Address: address,
ProviderId: id,
ProviderId: providerID,
}
// try to find provider based on storageproviderid prefix if only root is requested
if provider.ProviderID != "" && providerID != "" && provider.ProviderID == providerID && mask == "root" {
// construct space based on configured properties without actually making a ListStorageSpaces call
space := &providerpb.StorageSpace{
Id: &providerpb.StorageSpaceId{OpaqueId: id},
Root: &providerpb.ResourceId{
StorageId: spaceID,
OpaqueId: nodeID,
},
}
// this is a request for requests by id
// setPath(space, provider.Path) // hmm not enough info to build a path.... the space alias is no longer known here we would need to query the provider

validSpaces := []*providerpb.StorageSpace{space}
if err := setSpaces(p, validSpaces); err != nil {
appctx.GetLogger(ctx).Error().Err(err).Interface("provider", provider).Interface("spaces", validSpaces).Msg("marshaling spaces failed, continuing")
return nil
}
providerInfos = append(providerInfos, p)
return providerInfos
}
if provider.ProviderID != "" && providerID != "" && provider.ProviderID != providerID {
// skip mismatching storageproviders
continue
}
filters := []*providerpb.ListStorageSpacesRequest_Filter{{
Type: providerpb.ListStorageSpacesRequest_Filter_TYPE_ID,
Expand Down Expand Up @@ -461,7 +487,7 @@ func (r *registry) findProvidersForResource(ctx context.Context, id string, find

// findProvidersForAbsolutePathReference takes a path and returns the storage provider with the longest matching path prefix
// FIXME use regex to return the correct provider when multiple are configured
func (r *registry) findProvidersForAbsolutePathReference(ctx context.Context, path string, unique, unrestricted bool) []*registrypb.ProviderInfo {
func (r *registry) findProvidersForAbsolutePathReference(ctx context.Context, path string, unique, unrestricted bool, _ string) []*registrypb.ProviderInfo {
currentUser := ctxpkg.ContextMustGetUser(ctx)

deepestMountPath := ""
Expand Down Expand Up @@ -568,7 +594,7 @@ func (r *registry) findProvidersForAbsolutePathReference(ctx context.Context, pa

// findAllProviders returns a list of all storage providers
// This is a dumb call that does not call ListStorageSpaces() on the providers: ListStorageSpaces() in the gateway can cache that better.
func (r *registry) findAllProviders(ctx context.Context) []*registrypb.ProviderInfo {
func (r *registry) findAllProviders(ctx context.Context, _ string) []*registrypb.ProviderInfo {
pis := make([]*registrypb.ProviderInfo, 0, len(r.c.Providers))
for address := range r.c.Providers {
pis = append(pis, &registrypb.ProviderInfo{
Expand Down
22 changes: 14 additions & 8 deletions pkg/storage/utils/decomposedfs/spaces.go
Original file line number Diff line number Diff line change
Expand Up @@ -410,12 +410,6 @@ func (fs *Decomposedfs) UpdateStorageSpace(ctx context.Context, req *provider.Up
return nil, err
}

if restore {
if err := node.SetDTime(nil); err != nil {
return nil, err
}
}

u, ok := ctxpkg.ContextGetUser(ctx)
if !ok {
return nil, fmt.Errorf("decomposedfs: spaces: contextual user not found")
Expand Down Expand Up @@ -462,11 +456,17 @@ func (fs *Decomposedfs) UpdateStorageSpace(ctx context.Context, req *provider.Up
}

// TODO change the permission handling
// these two attributes need manager permissions
if space.Name != "" || hasDescription {
// these three attributes need manager permissions
if space.Name != "" || hasDescription || restore {
err = fs.checkManagerPermission(ctx, node)
}
if err != nil {
if restore {
// a disabled space is invisible to non admins
return &provider.UpdateStorageSpaceResponse{
Status: &v1beta11.Status{Code: v1beta11.Code_CODE_NOT_FOUND, Message: err.Error()},
}, nil
}
return &provider.UpdateStorageSpaceResponse{
Status: &v1beta11.Status{Code: v1beta11.Code_CODE_PERMISSION_DENIED, Message: err.Error()},
}, nil
Expand All @@ -484,6 +484,12 @@ func (fs *Decomposedfs) UpdateStorageSpace(ctx context.Context, req *provider.Up
return nil, err
}

if restore {
if err := node.SetDTime(nil); err != nil {
return nil, err
}
}

// send back the updated data from the storage
updatedSpace, err := fs.storageSpaceFromNode(ctx, node, node.InternalPath(), false, false)
if err != nil {
Expand Down

0 comments on commit 036e520

Please sign in to comment.